You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/21 03:53:47 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add bwf with value filter range query

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 71b2f2d  add bwf with value filter range query
     new d222853  Merge branch 'feature_async_close_tsfile' of https://github.com/apache/incubator-iotdb into feature_async_close_tsfile
71b2f2d is described below

commit 71b2f2df86a01840da6e49089df4ab93da23d4fe
Author: suyue <23...@qq.com>
AuthorDate: Fri Jun 21 11:52:28 2019 +0800

    add bwf with value filter range query
---
 .../executor/EngineExecutorWithTimeGenerator.java  |   2 +-
 .../db/query/factory/SeriesReaderFactory.java      |  38 ++++++
 .../FileSeriesByTimestampIAggregateReader.java     |  43 ++++++
 .../sequence/SequenceDataReaderByTimestampV2.java  | 147 +++++++++++++++++++++
 .../UnSealedTsFilesReaderByTimestampV2.java        |  78 +++++++++++
 5 files changed, 307 insertions(+), 1 deletion(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 3b40187..9001ee0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -66,7 +66,7 @@ public class EngineExecutorWithTimeGenerator {
     try {
       timestampGenerator = new EngineTimeGenerator(queryExpression.getExpression(), context);
       readersOfSelectedSeries = SeriesReaderFactory
-          .getByTimestampReadersOfSelectedPaths(queryExpression.getSelectedSeries(), context);
+          .getByTimestampReadersOfSelectedPathsV2(queryExpression.getSelectedSeries(), context);
     } catch (IOException ex) {
       throw new FileNodeManagerException(ex);
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index b731fc5..982aafc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
 import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader;
 import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestampV2;
 import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
 import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReaderByTimestamp;
 import org.apache.iotdb.db.utils.QueryUtils;
@@ -223,6 +225,42 @@ public class SeriesReaderFactory {
   }
 
   /**
+   * construct ByTimestampReader, include sequential data and unsequential data.
+   *
+   * @param paths selected series path
+   * @param context query context
+   * @return the list of EngineReaderByTimeStamp
+   */
+  public static List<EngineReaderByTimeStamp> getByTimestampReadersOfSelectedPathsV2(
+      List<Path> paths, QueryContext context) throws IOException, FileNodeManagerException {
+
+    List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+
+    for (Path path : paths) {
+
+      QueryDataSourceV2 queryDataSource = QueryResourceManager.getInstance().getQueryDataSourceV2(path,
+          context);
+
+      PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp();
+
+      // reader for sequence data
+      SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
+          queryDataSource.getSeqDataSource().getQueryTsFiles(), context);
+      mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
+
+      // reader for unSequence data
+      //TODO
+//      PriorityMergeReaderByTimestamp unSeqMergeReader = SeriesReaderFactory.getInstance()
+//          .createUnSeqMergeReaderByTimestamp(queryDataSource.getOverflowSeriesDataSource());
+//      mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
+
+      readersOfSelectedSeries.add(mergeReaderByTimestamp);
+    }
+
+    return readersOfSelectedSeries;
+  }
+
+  /**
    * This method is used to create unsequence insert reader by timestamp for IoTDB request, such as
    * query, aggregation and groupby request.
    */
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/FileSeriesByTimestampIAggregateReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/FileSeriesByTimestampIAggregateReader.java
new file mode 100644
index 0000000..810d649
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/FileSeriesByTimestampIAggregateReader.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.sequence;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+
+public class FileSeriesByTimestampIAggregateReader implements EngineReaderByTimeStamp {
+
+  private SeriesReaderByTimestamp seriesReaderByTimestamp;
+
+  public FileSeriesByTimestampIAggregateReader(SeriesReaderByTimestamp seriesReaderByTimestamp) {
+    this.seriesReaderByTimestamp = seriesReaderByTimestamp;
+  }
+
+  @Override
+  public Object getValueInTimestamp(long timestamp) throws IOException {
+    return seriesReaderByTimestamp.getValueInTimestamp(timestamp);
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return seriesReaderByTimestamp.hasNext();
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampV2.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampV2.java
new file mode 100644
index 0000000..d3abac2
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampV2.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.sequence;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+
+public class SequenceDataReaderByTimestampV2 implements EngineReaderByTimeStamp {
+
+  protected Path seriesPath;
+  private List<TsFileResourceV2> tsFileResourceV2List;
+  private int nextIntervalFileIndex;
+  protected EngineReaderByTimeStamp seriesReader;
+  private QueryContext context;
+
+  /**
+   * init with seriesPath and sealedTsFiles.
+   */
+  public SequenceDataReaderByTimestampV2(Path seriesPath,
+      List<TsFileResourceV2> tsFileResourceV2List,
+      QueryContext context) {
+    this.seriesPath = seriesPath;
+    this.tsFileResourceV2List = tsFileResourceV2List;
+    this.nextIntervalFileIndex = 0;
+    this.seriesReader = null;
+    this.context = context;
+  }
+
+  @Override
+  public Object getValueInTimestamp(long timestamp) throws IOException {
+    Object value = null;
+    if (seriesReader != null) {
+      value = seriesReader.getValueInTimestamp(timestamp);
+      if (value != null || seriesReader.hasNext()) {
+        return value;
+      }
+    }
+    constructReader(timestamp);
+    if (seriesReader != null) {
+      value = seriesReader.getValueInTimestamp(timestamp);
+      if (value != null || seriesReader.hasNext()) {
+        return value;
+      }
+    }
+
+    return value;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    if (seriesReader != null && seriesReader.hasNext()) {
+      return true;
+    }
+    while (nextIntervalFileIndex < tsFileResourceV2List.size()) {
+      initSealedTsFileReader(tsFileResourceV2List.get(nextIntervalFileIndex), context);
+      nextIntervalFileIndex++;
+      if (seriesReader.hasNext()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * construct reader with the file that might overlap this timestamp.
+   */
+  private void constructReader(long timestamp) throws IOException {
+    while (nextIntervalFileIndex < tsFileResourceV2List.size()) {
+      TsFileResourceV2 tsFile = tsFileResourceV2List.get(nextIntervalFileIndex);
+      nextIntervalFileIndex++;
+      if (!tsFile.isClosed()) {
+        initUnSealedTsFileReader(tsFile, context);
+        break;
+      }
+      if (singleTsFileSatisfied(tsFile, timestamp)) {
+        initSealedTsFileReader(tsFile, context);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Judge whether the file should be skipped.
+   */
+  private boolean singleTsFileSatisfied(TsFileResourceV2 fileNode, long timestamp) {
+    if (fileNode.isClosed()) {
+      return fileNode.getEndTimeMap().get(seriesPath.getDevice()) >= timestamp;
+    }
+    return true;
+  }
+
+  private void initUnSealedTsFileReader(TsFileResourceV2 tsFile, QueryContext context)
+      throws IOException {
+    seriesReader = new UnSealedTsFilesReaderByTimestampV2(tsFile);
+  }
+
+  private void initSealedTsFileReader(TsFileResourceV2 fileNode, QueryContext context)
+      throws IOException {
+
+    // to avoid too many opened files
+    TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
+        .get(fileNode.getFile().getPath(), true);
+
+    MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
+    List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
+
+    List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
+        seriesPath.getFullPath());
+    if (!pathModifications.isEmpty()) {
+      QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
+    }
+    ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
+
+    seriesReader = new FileSeriesByTimestampIAggregateReader(
+        new SeriesReaderByTimestamp(chunkLoader, metaDataList));
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestampV2.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestampV2.java
new file mode 100644
index 0000000..3a93346
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestampV2.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.sequence;
+
+import java.io.IOException;
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+
+public class UnSealedTsFilesReaderByTimestampV2 implements EngineReaderByTimeStamp {
+
+  protected Path seriesPath;
+  private SeriesReaderByTimestamp unSealedReader;
+  private EngineReaderByTimeStamp memSeriesReader;
+  private boolean unSealedReaderEnded;
+
+  /**
+   * Construct funtion for UnSealedTsFileReader.
+   *
+   * @param tsFileResource -unclosed tsfile resource
+   */
+  public UnSealedTsFilesReaderByTimestampV2(TsFileResourceV2 tsFileResource) throws IOException {
+    TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
+        .get(tsFileResource.getFile().getPath(), false);
+    ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
+    unSealedReader = new SeriesReaderByTimestamp(chunkLoader,
+        tsFileResource.getChunkMetaDatas());
+
+    memSeriesReader = new MemChunkReaderByTimestamp(tsFileResource.getReadOnlyMemChunk());
+    unSealedReaderEnded = false;
+  }
+
+  @Override
+  public Object getValueInTimestamp(long timestamp) throws IOException {
+    Object value = null;
+    if (!unSealedReaderEnded) {
+      value = unSealedReader.getValueInTimestamp(timestamp);
+    }
+    if (value != null || unSealedReader.hasNext()) {
+      return value;
+    } else {
+      unSealedReaderEnded = true;
+    }
+    return memSeriesReader.getValueInTimestamp(timestamp);
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    if (unSealedReaderEnded) {
+      return memSeriesReader.hasNext();
+    }
+    return (unSealedReader.hasNext() || memSeriesReader.hasNext());
+  }
+
+}