You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/06 09:09:51 UTC
[iotdb] 01/04: SingleInputSingleOutputIntermediateLayer:
constructPointReader & constructRowReader &
constructRowSlidingSizeWindowReader
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 025fd00aa3766e50e0baf0717b53893abf105375
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 10:37:12 2021 +0800
SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader
---
...SerializableTVListBackedSingleColumnWindow.java | 4 +-
.../db/query/udf/core/layer/IntermediateLayer.java | 8 +-
.../MultiInputMultiOutputIntermediateLayer.java | 27 +++-
.../MultiInputSingleOutputIntermediateLayer.java | 27 +++-
.../SingleInputMultiOutputIntermediateLayer.java | 29 +++-
.../SingleInputSingleOutputIntermediateLayer.java | 166 ++++++++++++++++++++-
.../iotdb/db/query/udf/core/layer/UDFLayer.java | 28 ++--
.../db/query/udf/core/reader/LayerRowReader.java | 2 +-
8 files changed, 262 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index f91dd35..4157f90 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -76,14 +76,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
return rowIterator;
}
- private RowWindow seek(int beginIndex, int endIndex) {
+ public void seek(int beginIndex, int endIndex) {
this.beginIndex = beginIndex;
this.endIndex = endIndex;
size = endIndex - beginIndex;
row.seek(beginIndex);
rowIterator = null;
-
- return this;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 069af89..49aae1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.udf.core.layer;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -41,7 +42,7 @@ public abstract class IntermediateLayer {
public abstract LayerRowReader constructRowReader();
public final LayerRowWindowReader constructRowWindowReader(
- AccessStrategy strategy, float memoryBudgetInMB) {
+ AccessStrategy strategy, float memoryBudgetInMB) throws QueryProcessException {
switch (strategy.getAccessStrategyType()) {
case SLIDING_TIME_WINDOW:
return constructRowSlidingTimeWindowReader(
@@ -56,10 +57,9 @@ public abstract class IntermediateLayer {
}
protected abstract LayerRowWindowReader constructRowSlidingSizeWindowReader(
- SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB);
+ SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException;
protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB);
-
- public abstract void updateEvictionUpperBound();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
index 9cd5422..2a70a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
@@ -19,17 +19,40 @@
package org.apache.iotdb.db.query.udf.core.layer;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
import java.util.List;
-public class MultiInputMultiOutputIntermediateLayer implements IntermediateLayer {
+public class MultiInputMultiOutputIntermediateLayer extends IntermediateLayer {
public MultiInputMultiOutputIntermediateLayer(
- List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+ long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
+ super(queryId, memoryBudgetInMB);
+ }
@Override
public LayerPointReader constructPointReader() {
return null;
}
+
+ @Override
+ public LayerRowReader constructRowReader() {
+ return null;
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+ SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
index 71a4f06..c63b1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
@@ -19,17 +19,40 @@
package org.apache.iotdb.db.query.udf.core.layer;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
import java.util.List;
-public class MultiInputSingleOutputIntermediateLayer implements IntermediateLayer {
+public class MultiInputSingleOutputIntermediateLayer extends IntermediateLayer {
public MultiInputSingleOutputIntermediateLayer(
- List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+ long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
+ super(queryId, memoryBudgetInMB);
+ }
@Override
public LayerPointReader constructPointReader() {
return null;
}
+
+ @Override
+ public LayerRowReader constructRowReader() {
+ return null;
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+ SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
index a711ced..17467b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
@@ -20,27 +20,33 @@
package org.apache.iotdb.db.query.udf.core.layer;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
-public class SingleInputMultiOutputIntermediateLayer implements IntermediateLayer {
+public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
private static final int CACHE_BLOCK_SIZE = 2;
- private final TSDataType dataType;
private final LayerPointReader parentLayerPointReader;
+ private final TSDataType dataType;
private final ElasticSerializableTVList tvList;
private final SafetyLine safetyLine;
public SingleInputMultiOutputIntermediateLayer(
- LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
+ long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader)
throws QueryProcessException {
+ super(queryId, memoryBudgetInMB);
this.parentLayerPointReader = parentLayerPointReader;
+
dataType = parentLayerPointReader.getDataType();
tvList =
ElasticSerializableTVList.newElasticSerializableTVList(
@@ -162,4 +168,21 @@ public class SingleInputMultiOutputIntermediateLayer implements IntermediateLaye
}
};
}
+
+ @Override
+ public LayerRowReader constructRowReader() {
+ return null;
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+ SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 131472a..3c1c705 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -19,13 +19,28 @@
package org.apache.iotdb.db.query.udf.core.layer;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow;
+import org.apache.iotdb.db.query.udf.core.access.LayerPointReaderBackedSingleColumnRow;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
+import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-public class SingleInputSingleOutputIntermediateLayer implements IntermediateLayer {
+import java.io.IOException;
+
+public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
private final LayerPointReader parentLayerPointReader;
- public SingleInputSingleOutputIntermediateLayer(LayerPointReader parentLayerPointReader) {
+ public SingleInputSingleOutputIntermediateLayer(
+ long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
+ super(queryId, memoryBudgetInMB);
this.parentLayerPointReader = parentLayerPointReader;
}
@@ -33,4 +48,151 @@ public class SingleInputSingleOutputIntermediateLayer implements IntermediateLay
public LayerPointReader constructPointReader() {
return parentLayerPointReader;
}
+
+ @Override
+ public LayerRowReader constructRowReader() {
+
+ return new LayerRowReader() {
+
+ private final Row row = new LayerPointReaderBackedSingleColumnRow(parentLayerPointReader);
+ private final TSDataType[] dataTypes =
+ new TSDataType[] {parentLayerPointReader.getDataType()};
+ private boolean hasCached = false;
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ if (hasCached) {
+ return true;
+ }
+ hasCached = parentLayerPointReader.next();
+ return hasCached;
+ }
+
+ @Override
+ public void readyForNext() {
+ parentLayerPointReader.readyForNext();
+ hasCached = false;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ @Override
+ public long currentTime() throws IOException {
+ return parentLayerPointReader.currentTime();
+ }
+
+ @Override
+ public Row currentRow() {
+ return row;
+ }
+ };
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+ SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException {
+
+ return new LayerRowWindowReader() {
+
+ private final int windowSize = strategy.getWindowSize();
+ private final int slidingStep = strategy.getSlidingStep();
+
+ private final TSDataType dataType = parentLayerPointReader.getDataType();
+ private final ElasticSerializableTVList tvList =
+ ElasticSerializableTVList.newElasticSerializableTVList(
+ dataType, queryId, memoryBudgetInMB, 2);
+ private final ElasticSerializableTVListBackedSingleColumnWindow window =
+ new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+ private boolean hasCached = false;
+ private int beginIndex = -slidingStep;
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ if (hasCached) {
+ return true;
+ }
+
+ beginIndex += slidingStep;
+ int endIndex = beginIndex + windowSize;
+
+ int pointsToBeCollected = endIndex - tvList.size();
+ if (0 < pointsToBeCollected) {
+ hasCached = collectPoints(pointsToBeCollected) != 0;
+ window.seek(beginIndex, tvList.size());
+ } else {
+ hasCached = true;
+ window.seek(beginIndex, endIndex);
+ }
+
+ return hasCached;
+ }
+
+ /** @return number of actually collected, which may be less than or equals to pointNumber */
+ private int collectPoints(int pointNumber) throws QueryProcessException, IOException {
+ int count = 0;
+
+ while (parentLayerPointReader.next() && count < pointNumber) {
+ ++count;
+
+ switch (dataType) {
+ case INT32:
+ tvList.putInt(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+ break;
+ case INT64:
+ tvList.putLong(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+ break;
+ case FLOAT:
+ tvList.putFloat(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+ break;
+ case DOUBLE:
+ tvList.putDouble(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+ break;
+ case BOOLEAN:
+ tvList.putBoolean(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+ break;
+ case TEXT:
+ tvList.putBinary(
+ parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+ break;
+ default:
+ }
+
+ parentLayerPointReader.readyForNext();
+ }
+
+ return count;
+ }
+
+ @Override
+ public void readyForNext() {
+ hasCached = false;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return new TSDataType[] {dataType};
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
index a613c4c..3148dd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
@@ -31,8 +31,8 @@ import org.apache.iotdb.db.query.udf.api.access.RowWindow;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.access.RowImpl;
-import org.apache.iotdb.db.query.udf.core.access.RowWindowImpl;
+import org.apache.iotdb.db.query.udf.core.access.MultiColumnRow;
+import org.apache.iotdb.db.query.udf.core.access.MultiColumnWindow;
import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
@@ -68,7 +68,7 @@ public class UDFLayer {
List<TSDataType> dataTypes,
List<ManagedSeriesReader> readers)
throws QueryProcessException, IOException, InterruptedException {
- constructInputLayer(
+ construct(
queryId,
memoryBudgetInMB,
new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
@@ -84,14 +84,18 @@ public class UDFLayer {
List<IReaderByTimestamp> readers,
List<Boolean> cached)
throws QueryProcessException {
- constructInputLayer(
+ construct(
queryId,
memoryBudgetInMB,
new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
}
- private void constructInputLayer(
- long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+ public UDFLayer(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+ throws QueryProcessException {
+ construct(queryId, memoryBudgetInMB, queryDataSet);
+ }
+
+ private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
throws QueryProcessException {
this.queryId = queryId;
this.queryDataSet = queryDataSet;
@@ -242,7 +246,7 @@ public class UDFLayer {
private boolean hasCachedRowRecord;
private Object[] cachedRowRecord;
- private final RowImpl row;
+ private final MultiColumnRow row;
public InputLayerRowReader(int[] columnIndexes) {
safetyPile = safetyLine.addSafetyPile();
@@ -253,7 +257,7 @@ public class UDFLayer {
hasCachedRowRecord = false;
cachedRowRecord = null;
- row = new RowImpl(columnIndexes, dataTypes);
+ row = new MultiColumnRow(columnIndexes, dataTypes);
}
@Override
@@ -321,7 +325,7 @@ public class UDFLayer {
private final int windowSize;
private final IntList rowIndexes;
- private final RowWindowImpl rowWindow;
+ private final MultiColumnWindow rowWindow;
private final int slidingStep;
@@ -343,7 +347,7 @@ public class UDFLayer {
windowSize < SerializableIntList.calculateCapacity(memoryBudgetInMB)
? new WrappedIntArray(windowSize)
: new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
- rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes);
+ rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
slidingStep = accessStrategy.getSlidingStep();
@@ -452,7 +456,7 @@ public class UDFLayer {
private final long displayWindowEnd;
private final IntList rowIndexes;
- private final RowWindowImpl rowWindow;
+ private final MultiColumnWindow rowWindow;
private long nextWindowTimeBegin;
private int nextIndexBegin;
@@ -475,7 +479,7 @@ public class UDFLayer {
displayWindowEnd = accessStrategy.getDisplayWindowEnd();
rowIndexes = new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
- rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes);
+ rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
nextWindowTimeBegin = accessStrategy.getDisplayWindowBegin();
nextIndexBegin = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
index 413a24d..0bc6521 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
@@ -33,7 +33,7 @@ public interface LayerRowReader {
TSDataType[] getDataTypes();
- long currentTime();
+ long currentTime() throws IOException;
Row currentRow();
}