You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/01/02 02:11:21 UTC
[incubator-iotdb] 01/01: abstract an interface ManagedSeriesReader
from SeriesReaderWithoutValueFilter
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch abstract_SeriesReaderWithoutValueFilter
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 40c6e94f97bbb5aa6720f067c786d6ae7bba5734
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Jan 2 10:10:31 2020 +0800
abstract an interface ManagedSeriesReader from SeriesReaderWithoutValueFilter
---
.../NewEngineDataSetWithoutValueFilter.java | 14 ++++----
.../iotdb/db/query/executor/EngineExecutor.java | 5 +--
.../iotdb/db/query/reader/ManagedSeriesReader.java | 37 ++++++++++++++++++++++
.../SeriesReaderWithoutValueFilter.java | 8 +++--
.../SeriesReaderWithValueFilterTest.java | 3 +-
.../SeriesReaderWithoutValueFilterTest.java | 5 +--
6 files changed, 58 insertions(+), 14 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
index e3329ce..2590fa1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -45,10 +45,10 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
private static class ReadTask implements Runnable {
- private final SeriesReaderWithoutValueFilter reader;
+ private final ManagedSeriesReader reader;
private BlockingQueue<BatchData> blockingQueue;
- public ReadTask(SeriesReaderWithoutValueFilter reader, BlockingQueue<BatchData> blockingQueue) {
+ public ReadTask(ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue) {
this.reader = reader;
this.blockingQueue = blockingQueue;
}
@@ -95,7 +95,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
}
}
- private List<SeriesReaderWithoutValueFilter> seriesReaderWithoutValueFilterList;
+ private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
private TreeSet<Long> timeHeap;
@@ -130,7 +130,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
* @param readers readers in List(IPointReader) structure
*/
public NewEngineDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
- List<SeriesReaderWithoutValueFilter> readers) throws InterruptedException {
+ List<ManagedSeriesReader> readers) throws InterruptedException {
super(paths, dataTypes);
this.seriesReaderWithoutValueFilterList = readers;
blockingQueueArray = new BlockingQueue[readers.size()];
@@ -145,7 +145,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
private void init() throws InterruptedException {
timeHeap = new TreeSet<>();
for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
- SeriesReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(i);
+ ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
reader.setHasRemaining(true);
reader.setManagedByQueryManager(true);
pool.submit(new ReadTask(reader, blockingQueueArray[i]));
@@ -341,7 +341,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
// we only need to judge whether to submit another task when the queue is not full
if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
- SeriesReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+ ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
// if the reader isn't being managed and still has more data,
// that means this read task leave the pool before because the queue has no more space
// now we should submit it again
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
index 56962c5..f6582ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithValueFilter;
import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
@@ -70,12 +71,12 @@ public class EngineExecutor {
timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
}
- List<SeriesReaderWithoutValueFilter> readersOfSelectedSeries = new ArrayList<>();
+ List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
for (int i = 0; i < deduplicatedPaths.size(); i++) {
Path path = deduplicatedPaths.get(i);
TSDataType dataType = deduplicatedDataTypes.get(i);
- SeriesReaderWithoutValueFilter reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
+ ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
true);
readersOfSelectedSeries.add(reader);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java
new file mode 100644
index 0000000..67925be
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader;
+
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+/**
+ * ManagedSeriesReader is a combination of IBatchReader and IPointReader that provides
+ * additional interfaces to make it able to be run in a thread pool concurrently within a query.
+ */
+public interface ManagedSeriesReader extends IBatchReader, IPointReader {
+
+ boolean isManagedByQueryManager();
+
+ void setManagedByQueryManager(boolean managedByQueryManager);
+
+ boolean hasRemaining();
+
+ void setHasRemaining(boolean hasRemaining);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
index f9124fc..65d4585 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
@@ -40,7 +40,7 @@ import java.io.IOException;
*
* "without value filter" is equivalent to "with global time filter or without any filter".
*/
-public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReader {
+public class SeriesReaderWithoutValueFilter implements ManagedSeriesReader {
private IBatchReader seqResourceIterateReader;
private IBatchReader unseqResourceMergeReader;
@@ -111,18 +111,22 @@ public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReade
this.unseqResourceMergeReader = unseqResourceMergeReader;
}
+ @Override
public boolean isManagedByQueryManager() {
return managedByQueryManager;
}
+ @Override
public void setManagedByQueryManager(boolean managedByQueryManager) {
this.managedByQueryManager = managedByQueryManager;
}
+ @Override
public boolean hasRemaining() {
return hasRemaining;
}
+ @Override
public void setHasRemaining(boolean hasRemaining) {
this.hasRemaining = hasRemaining;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
index 3e9de56..ff02b72 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
@@ -20,13 +20,14 @@
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.junit.Test;
public class SeriesReaderWithValueFilterTest {
- private SeriesReaderWithoutValueFilter reader;
+ private ManagedSeriesReader reader;
private void init() throws IOException {
// (100,0),(105,1),(110,0),(115,1),(120,0),...
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
index ff06f20..141f210 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
@@ -20,13 +20,14 @@
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.junit.Test;
public class SeriesReaderWithoutValueFilterTest {
- private SeriesReaderWithoutValueFilter reader1;
- private SeriesReaderWithoutValueFilter reader2;
+ private ManagedSeriesReader reader1;
+ private ManagedSeriesReader reader2;
private void init() throws IOException {
IBatchReader batchReader1 = new FakedIBatchPoint(100, 1000, 7, 11);