You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/06 09:09:52 UTC
[iotdb] 02/04: SingleInputSingleOutputIntermediateLayer:
constructRowSlidingTimeWindowReader
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e61bd6a0d3259a540e71c94f373d1cbdc65bf57
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 11:31:08 2021 +0800
SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader
---
.../db/query/udf/core/layer/IntermediateLayer.java | 5 +-
.../SingleInputSingleOutputIntermediateLayer.java | 175 ++++++++++++++++-----
2 files changed, 137 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 49aae1a..c7ae82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
+import java.io.IOException;
+
public abstract class IntermediateLayer {
protected final long queryId;
@@ -61,5 +63,6 @@ public abstract class IntermediateLayer {
throws QueryProcessException;
protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
- SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB);
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException, IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 3c1c705..452ad3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -37,11 +37,13 @@ import java.io.IOException;
public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
private final LayerPointReader parentLayerPointReader;
+ private final TSDataType dataType;
public SingleInputSingleOutputIntermediateLayer(
long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
super(queryId, memoryBudgetInMB);
this.parentLayerPointReader = parentLayerPointReader;
+ dataType = parentLayerPointReader.getDataType();
}
@Override
@@ -101,7 +103,6 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
private final int windowSize = strategy.getWindowSize();
private final int slidingStep = strategy.getSlidingStep();
- private final TSDataType dataType = parentLayerPointReader.getDataType();
private final ElasticSerializableTVList tvList =
ElasticSerializableTVList.newElasticSerializableTVList(
dataType, queryId, memoryBudgetInMB, 2);
@@ -122,7 +123,7 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
int pointsToBeCollected = endIndex - tvList.size();
if (0 < pointsToBeCollected) {
- hasCached = collectPoints(pointsToBeCollected) != 0;
+ hasCached = collectPoints(pointsToBeCollected, tvList) != 0;
window.seek(beginIndex, tvList.size());
} else {
hasCached = true;
@@ -132,50 +133,105 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
return hasCached;
}
- /** @return number of actually collected, which may be less than or equals to pointNumber */
- private int collectPoints(int pointNumber) throws QueryProcessException, IOException {
- int count = 0;
-
- while (parentLayerPointReader.next() && count < pointNumber) {
- ++count;
-
- switch (dataType) {
- case INT32:
- tvList.putInt(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
- break;
- case INT64:
- tvList.putLong(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
- break;
- case FLOAT:
- tvList.putFloat(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
- break;
- case DOUBLE:
- tvList.putDouble(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
- break;
- case BOOLEAN:
- tvList.putBoolean(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
- break;
- case TEXT:
- tvList.putBinary(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
- break;
- default:
+ @Override
+ public void readyForNext() {
+ hasCached = false;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return new TSDataType[] {dataType};
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException, IOException {
+
+ final long timeInterval = strategy.getTimeInterval();
+ final long slidingStep = strategy.getSlidingStep();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+ final ElasticSerializableTVList tvList =
+ ElasticSerializableTVList.newElasticSerializableTVList(
+ dataType, queryId, memoryBudgetInMB, 2);
+ final ElasticSerializableTVListBackedSingleColumnWindow window =
+ new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+ long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+ if (tvList.size() == 0 && parentLayerPointReader.next()) {
+ collectPoints(1, tvList);
+
+ if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+ // display window begin should be set to the same as the min timestamp of the query result
+ // set
+ nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
+ }
+ }
+ long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
+
+ final boolean hasAtLeastOneRow = tvList.size() != 0;
+
+ return new LayerRowWindowReader() {
+
+ private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy;
+ private int nextIndexBegin = 0;
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ if (displayWindowEnd <= nextWindowTimeBegin) {
+ return false;
+ }
+ if (!hasAtLeastOneRow || 0 < tvList.size()) {
+ return true;
+ }
+
+ long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
+ int oldTVListSize = tvList.size();
+ while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
+ if (parentLayerPointReader.next()) {
+ collectPoints(1, tvList);
+ } else if (displayWindowEnd == Long.MAX_VALUE
+ // display window end == the max timestamp of the query result set
+ && oldTVListSize == tvList.size()) {
+ return false;
+ } else {
+ break;
}
+ }
- parentLayerPointReader.readyForNext();
+ for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+ if (nextWindowTimeBegin <= tvList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ if (i == tvList.size() - 1) {
+ nextIndexBegin = tvList.size();
+ }
}
- return count;
+ int nextIndexEnd = tvList.size();
+ for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+ if (nextWindowTimeEnd <= tvList.getTime(i)) {
+ nextIndexEnd = i;
+ break;
+ }
+ }
+ window.seek(nextIndexBegin, nextIndexEnd);
+
+ return true;
}
@Override
public void readyForNext() {
- hasCached = false;
+ nextWindowTimeBegin += slidingStep;
}
@Override
@@ -190,9 +246,44 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
};
}
- @Override
- protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
- SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
- return null;
+ /** @return number of actually collected, which may be less than or equals to pointNumber */
+ private int collectPoints(int pointNumber, ElasticSerializableTVList tvList)
+ throws QueryProcessException, IOException {
+ int count = 0;
+
+ while (parentLayerPointReader.next() && count < pointNumber) {
+ ++count;
+
+ switch (dataType) {
+ case INT32:
+ tvList.putInt(parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+ break;
+ case INT64:
+ tvList.putLong(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+ break;
+ case FLOAT:
+ tvList.putFloat(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+ break;
+ case DOUBLE:
+ tvList.putDouble(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+ break;
+ case BOOLEAN:
+ tvList.putBoolean(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+ break;
+ case TEXT:
+ tvList.putBinary(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+ break;
+ default:
+ }
+
+ parentLayerPointReader.readyForNext();
+ }
+
+ return count;
}
}