You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/01/02 02:11:21 UTC

[incubator-iotdb] 01/01: abstract an interface ManagedSeriesReader from SeriesReaderWithoutValueFilter

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch abstract_SeriesReaderWithoutValueFilter
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 40c6e94f97bbb5aa6720f067c786d6ae7bba5734
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Jan 2 10:10:31 2020 +0800

    abstract an interface ManagedSeriesReader from SeriesReaderWithoutValueFilter
---
 .../NewEngineDataSetWithoutValueFilter.java        | 14 ++++----
 .../iotdb/db/query/executor/EngineExecutor.java    |  5 +--
 .../iotdb/db/query/reader/ManagedSeriesReader.java | 37 ++++++++++++++++++++++
 .../SeriesReaderWithoutValueFilter.java            |  8 +++--
 .../SeriesReaderWithValueFilterTest.java           |  3 +-
 .../SeriesReaderWithoutValueFilterTest.java        |  5 +--
 6 files changed, 58 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
index e3329ce..2590fa1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -45,10 +45,10 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
 
   private static class ReadTask implements Runnable {
 
-    private final SeriesReaderWithoutValueFilter reader;
+    private final ManagedSeriesReader reader;
     private BlockingQueue<BatchData> blockingQueue;
 
-    public ReadTask(SeriesReaderWithoutValueFilter reader, BlockingQueue<BatchData> blockingQueue) {
+    public ReadTask(ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
     }
@@ -95,7 +95,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
     }
   }
 
-  private List<SeriesReaderWithoutValueFilter> seriesReaderWithoutValueFilterList;
+  private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
 
   private TreeSet<Long> timeHeap;
 
@@ -130,7 +130,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
    * @param readers readers in List(IPointReader) structure
    */
   public NewEngineDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
-                                            List<SeriesReaderWithoutValueFilter> readers) throws InterruptedException {
+                                            List<ManagedSeriesReader> readers) throws InterruptedException {
     super(paths, dataTypes);
     this.seriesReaderWithoutValueFilterList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
@@ -145,7 +145,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
   private void init() throws InterruptedException {
     timeHeap = new TreeSet<>();
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
-      SeriesReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(i);
+      ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
       pool.submit(new ReadTask(reader, blockingQueueArray[i]));
@@ -341,7 +341,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
       synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
         // we only need to judge whether to submit another task when the queue is not full
         if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
-          SeriesReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+          ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
           // if the reader isn't being managed and still has more data,
           // that means this read task leave the pool before because the queue has no more space
           // now we should submit it again
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
index 56962c5..f6582ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithValueFilter;
 import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
@@ -70,12 +71,12 @@ public class EngineExecutor {
       timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
     }
 
-    List<SeriesReaderWithoutValueFilter> readersOfSelectedSeries = new ArrayList<>();
+    List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
     for (int i = 0; i < deduplicatedPaths.size(); i++) {
       Path path = deduplicatedPaths.get(i);
       TSDataType dataType = deduplicatedDataTypes.get(i);
 
-      SeriesReaderWithoutValueFilter reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
+      ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
           true);
       readersOfSelectedSeries.add(reader);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java
new file mode 100644
index 0000000..67925be
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader;
+
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+/**
+ * ManagedSeriesReader is a combination of IBatchReader and IPointReader that provides
+ * additional interfaces to make it able to be run in a thread pool concurrently within a query.
+ */
+public interface ManagedSeriesReader extends IBatchReader, IPointReader {
+
+  boolean isManagedByQueryManager();
+
+  void setManagedByQueryManager(boolean managedByQueryManager);
+
+  boolean hasRemaining();
+
+  void setHasRemaining(boolean hasRemaining);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
index f9124fc..65d4585 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
 import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
@@ -40,7 +40,7 @@ import java.io.IOException;
  *
  * "without value filter" is equivalent to "with global time filter or without any filter".
  */
-public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReader {
+public class SeriesReaderWithoutValueFilter implements ManagedSeriesReader {
 
   private IBatchReader seqResourceIterateReader;
   private IBatchReader unseqResourceMergeReader;
@@ -111,18 +111,22 @@ public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReade
     this.unseqResourceMergeReader = unseqResourceMergeReader;
   }
 
+  @Override
   public boolean isManagedByQueryManager() {
     return managedByQueryManager;
   }
 
+  @Override
   public void setManagedByQueryManager(boolean managedByQueryManager) {
     this.managedByQueryManager = managedByQueryManager;
   }
 
+  @Override
   public boolean hasRemaining() {
     return hasRemaining;
   }
 
+  @Override
   public void setHasRemaining(boolean hasRemaining) {
     this.hasRemaining = hasRemaining;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
index 3e9de56..ff02b72 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
@@ -20,13 +20,14 @@
 package org.apache.iotdb.db.query.reader.seriesRelated;
 
 import java.io.IOException;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.junit.Test;
 
 public class SeriesReaderWithValueFilterTest {
 
-  private SeriesReaderWithoutValueFilter reader;
+  private ManagedSeriesReader reader;
 
   private void init() throws IOException {
     // (100,0),(105,1),(110,0),(115,1),(120,0),...
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
index ff06f20..141f210 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
@@ -20,13 +20,14 @@
 package org.apache.iotdb.db.query.reader.seriesRelated;
 
 import java.io.IOException;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.junit.Test;
 
 public class SeriesReaderWithoutValueFilterTest {
 
-  private SeriesReaderWithoutValueFilter reader1;
-  private SeriesReaderWithoutValueFilter reader2;
+  private ManagedSeriesReader reader1;
+  private ManagedSeriesReader reader2;
 
   private void init() throws IOException {
     IBatchReader batchReader1 = new FakedIBatchPoint(100, 1000, 7, 11);