You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/24 15:31:57 UTC

[incubator-iotdb] 01/03: fix bug of jira-121

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch qovc
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 055092562495f8c120ba6c2d74313f4dc4178b1c
Author: suyue <23...@qq.com>
AuthorDate: Mon Jun 24 22:50:39 2019 +0800

    fix bug of jira-121
---
 .../EngineExecutorWithoutTimeGenerator.java        |  3 +-
 .../db/query/factory/ISeriesReaderFactory.java     | 27 ++++--
 .../db/query/factory/SeriesReaderFactoryImpl.java  | 99 +++++++++++++++-------
 .../java/org/apache/iotdb/db/query/fill/IFill.java |  2 +-
 .../query/reader/AllDataReaderWithValueFilter.java | 74 ++++++++++++++++
 .../query/timegenerator/EngineNodeConstructor.java |  3 +-
 6 files changed, 170 insertions(+), 38 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 34d93cc..14b0b2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -97,6 +97,7 @@ public class EngineExecutorWithoutTimeGenerator {
       throw new FileNodeManagerException(e);
     }
 
-    return SeriesReaderFactoryImpl.getInstance().createAllDataReader(path, timeFilter, context);
+    return SeriesReaderFactoryImpl.getInstance()
+        .createTimeFilterAllDataReader(path, timeFilter, context);
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
index 416bae6..0bbefb1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
@@ -40,8 +40,9 @@ public interface ISeriesReaderFactory {
    * This method is used to read all unsequence data for IoTDB request, such as query, aggregation
    * and groupby request.
    */
-  IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources, QueryContext context,
-                                 Filter filter) throws IOException;
+  IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
+      QueryContext context,
+      Filter filter) throws IOException;
 
 
   /**
@@ -55,12 +56,28 @@ public interface ISeriesReaderFactory {
       QueryContext context) throws FileNodeManagerException, IOException;
 
   /**
-   * construct IPointReader, include sequential data and unsequential data.
+   * construct IPointReader with <br>only time filter or no filter</br>, include sequential data and
+   * unsequential data. This reader won't filter the result of merged sequential data and
+   * unsequential data reader.
    *
    * @param path selected series path
+   * @param timeFilter time filter or null
    * @param context query context
-   * @return the list of EngineReaderByTimeStamp
+   * @return data reader including seq and unseq data source.
    */
-  IPointReader createAllDataReader(Path path, Filter timeFilter,
+  IPointReader createTimeFilterAllDataReader(Path path, Filter timeFilter,
       QueryContext context) throws FileNodeManagerException, IOException;
+
+  /**
+   * construct IPointReader with <br>value filter</br>, include sequential data and unsequential
+   * data. This reader will filter the result of merged sequential data and unsequential data
+   * reader, so if only has time filter please call createTimeFilterAllDataReader().
+   *
+   * @param path selected series path
+   * @param filter time filter or null
+   * @param context query context
+   * @return data reader including seq and unseq data source.
+   */
+  IPointReader createValueFilterAllDataReader(Path path, Filter filter, QueryContext context)
+      throws FileNodeManagerException, IOException;
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
index a5a1651..2b45073 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.query.factory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
@@ -27,6 +30,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.reader.AllDataReader;
+import org.apache.iotdb.db.query.reader.AllDataReaderWithValueFilter;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
@@ -54,10 +58,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
   private static final Logger logger = LoggerFactory.getLogger(SeriesReaderFactory.class);
@@ -70,8 +70,9 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
   }
 
   @Override
-  public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources, QueryContext context,
-                                        Filter filter) throws IOException {
+  public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
+      QueryContext context,
+      Filter filter) throws IOException {
     PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
 
     int priorityValue = 1;
@@ -80,7 +81,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
       // store only one opened file stream into manager, to avoid too many opened files
       TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
-              .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
+          .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
 
       // get modified chunk metadatas
       List<ChunkMetaData> metaDataList;
@@ -88,7 +89,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
         MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
         metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
         // mod
-        List<Modification> pathModifications = context.getPathModifications(tsFileResourceV2.getModFile(),
+        List<Modification> pathModifications = context
+            .getPathModifications(tsFileResourceV2.getModFile(),
                 seriesPath.getFullPath());
         if (!pathModifications.isEmpty()) {
           QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
@@ -103,10 +105,10 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
       for (ChunkMetaData chunkMetaData : metaDataList) {
 
         DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
-                chunkMetaData.getEndTime(),
-                chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
-                chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
-                chunkMetaData.getTsDataType());
+            chunkMetaData.getEndTime(),
+            chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
+            chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
+            chunkMetaData.getTsDataType());
 
         if (filter != null && !filter.satisfy(digest)) {
           continue;
@@ -114,7 +116,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
         Chunk chunk = chunkLoader.getChunk(chunkMetaData);
         ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
-                : new ChunkReaderWithoutFilter(chunk);
+            : new ChunkReaderWithoutFilter(chunk);
 
         unSeqMergeReader.addReaderWithPriority(new EngineChunkReader(chunkReader), priorityValue);
 
@@ -123,7 +125,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
       // add reader for MemTable
       if (!tsFileResourceV2.isClosed()) {
-        unSeqMergeReader.addReaderWithPriority(new MemChunkReader(tsFileResourceV2.getReadOnlyMemChunk(), filter), priorityValue++);
+        unSeqMergeReader.addReaderWithPriority(
+            new MemChunkReader(tsFileResourceV2.getReadOnlyMemChunk(), filter), priorityValue++);
       }
     }
 
@@ -132,7 +135,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
   }
 
   private PriorityMergeReaderByTimestamp createUnSeqByTimestampReader(Path seriesPath,
-                                                                      List<TsFileResourceV2> unSeqResources, QueryContext context) throws IOException {
+      List<TsFileResourceV2> unSeqResources, QueryContext context) throws IOException {
     PriorityMergeReaderByTimestamp unSeqMergeReader = new PriorityMergeReaderByTimestamp();
 
     int priorityValue = 1;
@@ -141,14 +144,15 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
       // store only one opened file stream into manager, to avoid too many opened files
       TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
-              .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
+          .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
 
       List<ChunkMetaData> metaDataList;
       if (tsFileResourceV2.isClosed()) {
         MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
         metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
         // mod
-        List<Modification> pathModifications = context.getPathModifications(tsFileResourceV2.getModFile(),
+        List<Modification> pathModifications = context
+            .getPathModifications(tsFileResourceV2.getModFile(),
                 seriesPath.getFullPath());
         if (!pathModifications.isEmpty()) {
           QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
@@ -165,14 +169,15 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
         ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
 
         unSeqMergeReader.addReaderWithPriority(new EngineChunkReaderByTimestamp(chunkReader),
-                priorityValue);
+            priorityValue);
 
         priorityValue++;
       }
 
       // add reader for MemTable
       if (!tsFileResourceV2.isClosed()) {
-        unSeqMergeReader.addReaderWithPriority(new MemChunkReaderByTimestamp(tsFileResourceV2.getReadOnlyMemChunk()), priorityValue++);
+        unSeqMergeReader.addReaderWithPriority(
+            new MemChunkReaderByTimestamp(tsFileResourceV2.getReadOnlyMemChunk()), priorityValue++);
       }
     }
 
@@ -182,7 +187,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
   @Override
   public List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
-                                                                               QueryContext context) throws FileNodeManagerException, IOException {
+      QueryContext context) throws FileNodeManagerException, IOException {
     List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
 
     for (Path path : paths) {
@@ -190,8 +195,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
       QueryDataSourceV2 queryDataSource = null;
       try {
         queryDataSource = QueryResourceManager.getInstance()
-                .getQueryDataSourceV2(path,
-                        context);
+            .getQueryDataSourceV2(path,
+                context);
       } catch (ProcessorException e) {
         throw new FileNodeManagerException(e);
       }
@@ -200,13 +205,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
       // reader for sequence data
       SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
-              queryDataSource.getSeqResources(), context);
+          queryDataSource.getSeqResources(), context);
       mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
 
       // reader for unSequence data
       //TODO add create unseq reader
       PriorityMergeReaderByTimestamp unSeqMergeReader = createUnSeqByTimestampReader(path,
-              queryDataSource.getUnseqResources(), context);
+          queryDataSource.getUnseqResources(), context);
       mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
 
       readersOfSelectedSeries.add(mergeReaderByTimestamp);
@@ -216,12 +221,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
   }
 
   @Override
-  public IPointReader createAllDataReader(Path path, Filter timeFilter, QueryContext context)
-          throws FileNodeManagerException, IOException {
+  public IPointReader createTimeFilterAllDataReader(Path path, Filter timeFilter,
+      QueryContext context)
+      throws FileNodeManagerException, IOException {
     QueryDataSourceV2 queryDataSource = null;
     try {
       queryDataSource = QueryResourceManager.getInstance()
-              .getQueryDataSourceV2(path, context);
+          .getQueryDataSourceV2(path, context);
     } catch (ProcessorException e) {
       throw new FileNodeManagerException(e);
     }
@@ -230,12 +236,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
     SequenceDataReaderV2 tsFilesReader;
 
     tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
-            queryDataSource.getSeqResources(),
-            timeFilter, context);
+        queryDataSource.getSeqResources(),
+        timeFilter, context);
 
     // unseq reader for all chunk groups in unSeqFile
     IPointReader unSeqMergeReader = null;
-    unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context, timeFilter);
+    unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context,
+        timeFilter);
 
     if (!tsFilesReader.hasNext()) {
       //only have unsequence data.
@@ -246,6 +253,38 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
     }
   }
 
+  @Override
+  public IPointReader createValueFilterAllDataReader(Path path, Filter filter, QueryContext context)
+      throws FileNodeManagerException, IOException {
+    QueryDataSourceV2 queryDataSource = null;
+    try {
+      queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSourceV2(path, context);
+    } catch (ProcessorException e) {
+      throw new FileNodeManagerException(e);
+    }
+
+    // sequence reader for one sealed tsfile
+    SequenceDataReaderV2 tsFilesReader;
+
+    tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
+        queryDataSource.getSeqResources(),
+        filter, context);
+
+    // unseq reader for all chunk groups in unSeqFile. Filter for unSeqMergeReader is null, because
+    // we won't push down filter in unsequence data source.
+    IPointReader unSeqMergeReader;
+    unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context, null);
+
+    if (!tsFilesReader.hasNext()) {
+      //only have unsequence data.
+      return unSeqMergeReader;
+    } else {
+      //merge sequence data with unsequence data.
+      return new AllDataReaderWithValueFilter(tsFilesReader, unSeqMergeReader, filter);
+    }
+  }
+
   private static class SeriesReaderFactoryHelper {
 
     private static final SeriesReaderFactoryImpl INSTANCE = new SeriesReaderFactoryImpl();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index c720500..e8244af 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -54,7 +54,7 @@ public abstract class IFill {
       throws IOException, FileNodeManagerException {
     Filter timeFilter = constructFilter(beforeRange);
     allDataReader = SeriesReaderFactoryImpl.getInstance()
-        .createAllDataReader(path, timeFilter, context);
+        .createTimeFilterAllDataReader(path, timeFilter, context);
   }
 
   public abstract IPointReader getFillResult() throws IOException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReaderWithValueFilter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReaderWithValueFilter.java
new file mode 100644
index 0000000..8e5e632
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReaderWithValueFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader;
+
+import java.io.IOException;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * A value filter reader for read data source, including sequence data and unsequence data.
+ */
+public class AllDataReaderWithValueFilter extends AllDataReader {
+
+  private Filter filter;
+  private boolean hasCachedValue;
+  private TimeValuePair timeValuePair;
+
+  /**
+   * merge sequence reader, unsequence reader.
+   */
+  public AllDataReaderWithValueFilter(IBatchReader batchReader, IPointReader pointReader,
+      Filter filter) {
+    super(batchReader, pointReader);
+    this.filter = filter;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    if (hasCachedValue) {
+      return true;
+    }
+    while (super.hasNext()) {
+      timeValuePair = super.next();
+      if (filter.satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+        hasCachedValue = true;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public TimeValuePair next() throws IOException {
+    if (hasCachedValue || hasNext()) {
+      hasCachedValue = false;
+      return timeValuePair;
+    } else {
+      throw new IOException("data reader is out of bound.");
+    }
+  }
+
+
+  @Override
+  public TimeValuePair current() throws IOException {
+    return timeValuePair;
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index cc68ec2..4ca159b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -51,7 +51,8 @@ public class EngineNodeConstructor extends AbstractNodeConstructor {
         Filter filter = ((SingleSeriesExpression) expression).getFilter();
         Path path = ((SingleSeriesExpression) expression).getSeriesPath();
         return new EngineLeafNode(
-            SeriesReaderFactoryImpl.getInstance().createAllDataReader(path, filter, context));
+            SeriesReaderFactoryImpl.getInstance()
+                .createValueFilterAllDataReader(path, filter, context));
       } catch (IOException e) {
         throw new FileNodeManagerException(e);
       }