You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/22 07:16:29 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add part impl of ISeriesReaderFactory

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 2db1c50  add part impl of ISeriesReaderFactory
     new c727a10  Merge branch 'feature_async_close_tsfile' of https://github.com/apache/incubator-iotdb into feature_async_close_tsfile
2db1c50 is described below

commit 2db1c50fe2efc4e49693760866aeebc0f0acd057
Author: suyue <23...@qq.com>
AuthorDate: Sat Jun 22 15:15:51 2019 +0800

    add part impl of ISeriesReaderFactory
---
 .../db/query/factory/ISeriesReaderFactory.java     |  27 ++----
 .../db/query/factory/SeriesReaderFactoryImpl.java  | 102 +++++++++++++++++----
 2 files changed, 88 insertions(+), 41 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
index dec7378..c1be4ce 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
@@ -21,9 +21,8 @@ package org.apache.iotdb.db.query.factory;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -38,27 +37,13 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 public interface ISeriesReaderFactory {
 
 
-
   /**
    * This method is used to read all unsequence data for IoTDB request, such as query, aggregation
    * and groupby request.
    */
-  IPointReader createUnSeqReader(GlobalSortedSeriesDataSourceV2 overflowSeriesDataSource,
+  IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
       Filter filter) throws IOException;
 
-  /**
-   * This method is used to read all sequence data for IoTDB request, such as query, aggregation
-   * and groupby request.
-   */
-  IAggregateReader createSeqReader(GlobalSortedSeriesDataSourceV2 overflowSeriesDataSource,
-      Filter filter) throws IOException;
-
-  /**
-   * This method is used to merge only one TsFile data and one UnSeqFile data for merge process in
-   * IoTDB.
-   */
-  IPointReader createSeriesReaderForMerge(TsFileResourceV2 seqFile, TsFileResourceV2 unseqFile,
-      SingleSeriesExpression singleSeriesExpression, QueryContext context) throws IOException;
 
   /**
    * construct ByTimestampReader, including sequential data and unsequential data.
@@ -68,15 +53,15 @@ public interface ISeriesReaderFactory {
    * @return the list of EngineReaderByTimeStamp
    */
   List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
-      QueryContext context);
+      QueryContext context) throws FileNodeManagerException;
 
   /**
    * construct IPointReader, include sequential data and unsequential data.
    *
-   * @param paths selected series path
+   * @param path selected series path
    * @param context query context
    * @return the list of EngineReaderByTimeStamp
    */
-  List<IPointReader> createReadersOfSelectedPaths(List<Path> paths,
-      QueryContext context);
+  IPointReader createAllDataReader(Path path, Filter timeFilter,
+      QueryContext context) throws FileNodeManagerException;
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
index 455254c..8fa8d19 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
@@ -1,47 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.query.factory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestampV2;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderV2;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-public class SeriesReaderFactoryImpl implements ISeriesReaderFactory{
+public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
 
   @Override
-  public IPointReader createUnSeqReader(GlobalSortedSeriesDataSourceV2 overflowSeriesDataSource,
+  public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
       Filter filter) throws IOException {
-
     return null;
   }
 
   @Override
-  public IAggregateReader createSeqReader(GlobalSortedSeriesDataSourceV2 overflowSeriesDataSource,
-      Filter filter) throws IOException {
-    return null;
-  }
+  public List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
+      QueryContext context) throws FileNodeManagerException {
+    List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
 
-  @Override
-  public IPointReader createSeriesReaderForMerge(TsFileResourceV2 seqFile,
-      TsFileResourceV2 unseqFile, SingleSeriesExpression singleSeriesExpression,
-      QueryContext context) throws IOException {
-    return null;
+    for (Path path : paths) {
+
+      QueryDataSourceV2 queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSourceV2(path,
+              context);
+
+      PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp();
+
+      // reader for sequence data
+      SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
+          queryDataSource.getSeqResources(), context);
+      mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
+
+      // reader for unSequence data
+      //TODO add create unseq reader
+      PriorityMergeReaderByTimestamp unSeqMergeReader = createUnSeqByTimestampReader(
+          queryDataSource.getUnseqResources());
+      mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
+
+      readersOfSelectedSeries.add(mergeReaderByTimestamp);
+    }
+
+    return readersOfSelectedSeries;
   }
 
-  @Override
-  public List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
-      QueryContext context) {
+  private PriorityMergeReaderByTimestamp createUnSeqByTimestampReader(
+      List<TsFileResourceV2> unseqResources) {
     return null;
   }
 
   @Override
-  public List<IPointReader> createReadersOfSelectedPaths(List<Path> paths, QueryContext context) {
-    return null;
+  public IPointReader createAllDataReader(Path path, Filter timeFilter, QueryContext context)
+      throws FileNodeManagerException {
+    QueryDataSourceV2 queryDataSource = QueryResourceManager.getInstance()
+        .getQueryDataSourceV2(path,
+            context);
+
+    // sequence reader for one sealed tsfile
+    SequenceDataReaderV2 tsFilesReader;
+    try {
+      tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
+          queryDataSource.getSeqResources(),
+          timeFilter, context);
+    } catch (IOException e) {
+      throw new FileNodeManagerException(e);
+    }
+
+    // unseq reader for all chunk groups in unSeqFile
+    IPointReader unSeqMergeReader = null;
+    try {
+      unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), timeFilter);
+    } catch (IOException e) {
+      throw new FileNodeManagerException(e);
+    }
+    // merge sequence data with unsequence data.
+    return new AllDataReader(tsFilesReader, unSeqMergeReader);
   }
+
 }