You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/22 06:34:28 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: refactor queryDataSource (#198)

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new a1d9eaf  refactor queryDataSource (#198)
a1d9eaf is described below

commit a1d9eafd02e2b288e5f784e332ed6c0a26230a9f
Author: RUI, LEI <33...@users.noreply.github.com>
AuthorDate: Sat Jun 22 14:34:24 2019 +0800

    refactor queryDataSource (#198)
    
    * refactor queryDataSource
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 10 ++--
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  2 +
 .../GlobalSortedSeriesDataSourceV2.java            | 54 ----------------------
 .../db/engine/querycontext/QueryDataSourceV2.java  | 33 +++++++------
 .../iotdb/db/query/control/JobFileManager.java     |  4 +-
 .../AbstractExecutorWithoutTimeGeneratorV2.java    |  2 +-
 .../db/query/executor/AggregateEngineExecutor.java |  1 +
 .../db/query/factory/SeriesReaderFactory.java      |  2 +-
 .../reader/sequence/SequenceDataReaderV2.java      | 31 ++++++-------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java | 12 ++---
 10 files changed, 50 insertions(+), 101 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 73dcc08..8117628 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -33,7 +33,6 @@ import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
@@ -244,14 +243,11 @@ public class FileNodeProcessorV2 {
 
     lock.readLock().lock();
     try {
-      List<TsFileResourceV2> sequnceResources = getFileReSourceListForQuery(sequenceFileList,
+      List<TsFileResourceV2> seqResources = getFileReSourceListForQuery(sequenceFileList,
           deviceId, measurementId);
-      List<TsFileResourceV2> unsequnceResources = getFileReSourceListForQuery(unSequenceFileList,
+      List<TsFileResourceV2> unseqResources = getFileReSourceListForQuery(unSequenceFileList,
           deviceId, measurementId);
-      return new QueryDataSourceV2(
-          new GlobalSortedSeriesDataSourceV2(new Path(deviceId, measurementId), sequnceResources),
-          new GlobalSortedSeriesDataSourceV2(new Path(deviceId, measurementId),
-              unsequnceResources));
+      return new QueryDataSourceV2(new Path(deviceId, measurementId), seqResources, unseqResources);
     } finally {
       lock.readLock().unlock();
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 1d85bd1..cad3b29 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -318,6 +318,8 @@ public class UnsealedTsFileProcessorV2 {
           Collections.emptyMap());
       return new Pair<>(timeValuePairSorter,
           writer.getVisibleMetadatas(deviceId, measurementId, dataType));
+      //RL: TODO 后面查询处理时认为这里返回的List<ChunkMetaData>是已经被mod处理过的?
+
     } finally {
       flushQueryLock.readLock().unlock();
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSourceV2.java
deleted file mode 100644
index a5b5bfe..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSourceV2.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.querycontext;
-
-import java.util.List;
-import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.tsfile.read.common.Path;
-
-public class GlobalSortedSeriesDataSourceV2 {
-
-  private Path seriesPath;
-
-  // including sealed tsfile and unsealed tsfile
-  private List<TsFileResourceV2> queryTsFiles;
-
-
-  public GlobalSortedSeriesDataSourceV2(Path seriesPath, List<TsFileResourceV2> queryTsFiles) {
-    this.seriesPath = seriesPath;
-    this.queryTsFiles = queryTsFiles;
-  }
-
-  public Path getSeriesPath() {
-    return seriesPath;
-  }
-
-  public void setSeriesPath(Path seriesPath) {
-    this.seriesPath = seriesPath;
-  }
-
-  public List<TsFileResourceV2> getQueryTsFiles() {
-    return queryTsFiles;
-  }
-
-  public void setQueryTsFiles(
-      List<TsFileResourceV2> queryTsFiles) {
-    this.queryTsFiles = queryTsFiles;
-  }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSourceV2.java
index fb12950..52c21cb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSourceV2.java
@@ -16,27 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.querycontext;
 
-public class QueryDataSourceV2 {
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.tsfile.read.common.Path;
 
-  // sequence data source
-  private GlobalSortedSeriesDataSourceV2 sequenceDataSource;
+import java.util.List;
 
-  // unSequence data source
-  private GlobalSortedSeriesDataSourceV2 unSequenceDataSource;
+public class QueryDataSourceV2 {
+  private Path seriesPath;
+  private List<TsFileResourceV2> seqResources;
+  private List<TsFileResourceV2> unseqResources;
+
+  public QueryDataSourceV2(Path seriesPath, List<TsFileResourceV2> seqResources, List<TsFileResourceV2> unseqResources) {
+    this.seriesPath = seriesPath;
+    this.seqResources = seqResources;
+    this.unseqResources = unseqResources;
+  }
 
-  public QueryDataSourceV2(GlobalSortedSeriesDataSourceV2 sequenceDataSource,
-      GlobalSortedSeriesDataSourceV2 unSequenceDataSource) {
-    this.sequenceDataSource = sequenceDataSource;
-    this.unSequenceDataSource = unSequenceDataSource;
+  public Path getSeriesPath() {
+    return seriesPath;
   }
 
-  public GlobalSortedSeriesDataSourceV2 getSeqDataSource() {
-    return sequenceDataSource;
+  public List<TsFileResourceV2> getSeqResources() {
+    return seqResources;
   }
 
-  public GlobalSortedSeriesDataSourceV2 getUnSequenceDataSource() {
-    return unSequenceDataSource;
+  public List<TsFileResourceV2> getUnseqResources() {
+    return unseqResources;
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
index 256c9fc..5a17544 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
@@ -82,7 +82,7 @@ public class JobFileManager {
   public void addUsedFilesForGivenJob(long jobId, QueryDataSourceV2 dataSource) {
 
     //sequence data
-    for(TsFileResourceV2 tsFileResource : dataSource.getSeqDataSource().getQueryTsFiles()){
+    for(TsFileResourceV2 tsFileResource : dataSource.getSeqResources()){
       String path = tsFileResource.getFile().getPath();
       if(tsFileResource.isClosed()){
         addFilePathToMap(jobId, path, true);
@@ -93,7 +93,7 @@ public class JobFileManager {
     }
 
     //overflow data
-    for(TsFileResourceV2 tsFileResource : dataSource.getUnSequenceDataSource().getQueryTsFiles()){
+    for(TsFileResourceV2 tsFileResource : dataSource.getUnseqResources()){
       String path = tsFileResource.getFile().getPath();
       if(tsFileResource.isClosed()){
         addFilePathToMap(jobId, path, true);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGeneratorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGeneratorV2.java
index b4db49b..b5fbb2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGeneratorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGeneratorV2.java
@@ -64,7 +64,7 @@ public abstract class AbstractExecutorWithoutTimeGeneratorV2 {
     // sequence reader for one sealed tsfile
     SequenceDataReaderV2 tsFilesReader;
     try {
-      tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeqDataSource(),
+      tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(), queryDataSource.getSeqResources(),
           timeFilter, context);
     } catch (IOException e) {
       throw new FileNodeManagerException(e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 508a787..f91a8ff 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -115,6 +115,7 @@ public class AggregateEngineExecutor {
       // unseq reader for all chunk groups in unSeqFile, memory
       PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
           .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+
       readersOfSequenceData.add(sequenceReader);
       readersOfUnSequenceData.add(unSeqMergeReader);
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 982aafc..ffd335b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -245,7 +245,7 @@ public class SeriesReaderFactory {
 
       // reader for sequence data
       SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
-          queryDataSource.getSeqDataSource().getQueryTsFiles(), context);
+          queryDataSource.getSeqResources(), context);
       mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
 
       // reader for unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderV2.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderV2.java
index b97d40d..f785e30 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderV2.java
@@ -18,12 +18,8 @@
  */
 package org.apache.iotdb.db.query.reader.sequence;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.IAggregateReader;
@@ -40,6 +36,10 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
 import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * batch reader of data in: 1) sealed tsfile. 2) unsealed tsfile, which include data in disk of
  * unsealed file and in memtables that will be flushing to unsealed tsfile.
@@ -56,26 +56,25 @@ public class SequenceDataReaderV2 extends IterateReader {
   /**
    * init with globalSortedSeriesDataSource, filter, context and isReverse.
    *
-   * @param sources data source
-   * @param filter null if no filter
+   * @param seriesPath data source
+   * @param seqResources null if no filter
    * @param context query context
    * @param isReverse true-traverse chunks from behind forward, false-traverse chunks from front to
    * back.
    */
-  public SequenceDataReaderV2(GlobalSortedSeriesDataSourceV2 sources, Filter filter,
-      QueryContext context, boolean isReverse) throws IOException {
+  public SequenceDataReaderV2(Path seriesPath, List<TsFileResourceV2> seqResources, Filter timeFilter, QueryContext context, boolean isReverse) throws IOException {
     super();
-    this.seriesPath = sources.getSeriesPath();
+    this.seriesPath = seriesPath;
     this.enableReverse = isReverse;
     if (isReverse) {
-      Collections.reverse(sources.getQueryTsFiles());
+      Collections.reverse(seqResources);
     }
-    for (TsFileResourceV2 tsFileResource : sources.getQueryTsFiles()) {
+    for (TsFileResourceV2 tsFileResource : seqResources) {
       if (tsFileResource.isClosed()) {
-        constructSealedTsFileReader(tsFileResource, filter, context, seriesReaders);
+        constructSealedTsFileReader(tsFileResource, timeFilter, context, seriesReaders);
       } else {
         seriesReaders.add(
-            new UnSealedTsFileReaderV2(tsFileResource, filter, enableReverse));
+            new UnSealedTsFileReaderV2(tsFileResource, timeFilter, enableReverse));
       }
     }
 
@@ -84,10 +83,8 @@ public class SequenceDataReaderV2 extends IterateReader {
   /**
    * traverse chunks from front to back.
    */
-  public SequenceDataReaderV2(GlobalSortedSeriesDataSourceV2 sources, Filter filter,
-      QueryContext context) throws IOException {
-    this(sources, filter, context, false);
-
+  public SequenceDataReaderV2(Path seriesPath, List<TsFileResourceV2> seqResources, Filter timeFilter, QueryContext context) throws IOException {
+    this(seriesPath, seqResources, timeFilter, context, false);
   }
 
   private void constructSealedTsFileReader(TsFileResourceV2 tsFileResource, Filter filter,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index e14ebbe..31796c6 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -62,8 +62,8 @@ public class FileNodeProcessorV2Test {
 
     processor.syncCloseFileNode();
     QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
-    Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 100);
-    for (TsFileResourceV2 resource : queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+    Assert.assertEquals(queryDataSource.getSeqResources().size(), 100);
+    for (TsFileResourceV2 resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
   }
@@ -90,12 +90,12 @@ public class FileNodeProcessorV2Test {
     processor.syncCloseFileNode();
 
     QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
-    Assert.assertEquals(10, queryDataSource.getSeqDataSource().getQueryTsFiles().size());
-    Assert.assertEquals(10, queryDataSource.getUnSequenceDataSource().getQueryTsFiles().size());
-    for (TsFileResourceV2 resource : queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+    Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
+    for (TsFileResourceV2 resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
-    for (TsFileResourceV2 resource : queryDataSource.getUnSequenceDataSource().getQueryTsFiles()) {
+    for (TsFileResourceV2 resource : queryDataSource.getUnseqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
   }