You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/07 08:32:23 UTC
[iotdb] branch nested-operations updated:
MultiInputColumnIntermediateLayer
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/nested-operations by this push:
new 73b717b MultiInputColumnIntermediateLayer
73b717b is described below
commit 73b717b0d571c0e4b8a789f685d0cff894c2484d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Sep 7 16:31:46 2021 +0800
MultiInputColumnIntermediateLayer
---
.../iotdb/db/query/expression/Expression.java | 3 +-
.../query/expression/unary/FunctionExpression.java | 8 +-
...ializableRowRecordListBackedMultiColumnRow.java | 97 ++++++
...zableRowRecordListBackedMultiColumnWindow.java} | 39 +--
...RecordListBackedMultiColumnWindowIterator.java} | 20 +-
...SerializableTVListBackedSingleColumnWindow.java | 4 -
...ableTVListBackedSingleColumnWindowIterator.java | 2 +
.../db/query/udf/core/layer/LayerCacheUtils.java | 23 ++
.../layer/MultiInputColumnIntermediateLayer.java | 337 +++++++++++++++++++++
...InputColumnMultiReferenceIntermediateLayer.java | 58 ----
...nputColumnSingleReferenceIntermediateLayer.java | 58 ----
.../row/ElasticSerializableRowRecordList.java | 9 +-
12 files changed, 504 insertions(+), 154 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 09a15ad..10b4117 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -55,7 +56,7 @@ public abstract class Expression {
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
- throws QueryProcessException;
+ throws QueryProcessException, IOException;
public String getExpressionString() {
if (expressionString == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 10b3811..462ba07 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -28,10 +28,11 @@ import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -153,7 +154,7 @@ public class FunctionExpression extends Expression {
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
- throws QueryProcessException {
+ throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
for (Expression expression : expressions) {
@@ -165,8 +166,7 @@ public class FunctionExpression extends Expression {
}
expressionIntermediateLayerMap.put(
- this,
- new MultiInputColumnMultiReferenceIntermediateLayer(parentLayerPointReaders, -1, -1));
+ this, new MultiInputColumnIntermediateLayer(-1, -1, parentLayerPointReaders));
}
return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnRow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnRow.java
new file mode 100644
index 0000000..d29b0d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnRow.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.access;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class ElasticSerializableRowRecordListBackedMultiColumnRow implements Row {
+
+ private final TSDataType[] dataTypes;
+ private final int size;
+
+ private Object[] rowRecord;
+
+ public ElasticSerializableRowRecordListBackedMultiColumnRow(TSDataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ size = dataTypes.length;
+ }
+
+ @Override
+ public long getTime() {
+ return (long) rowRecord[size];
+ }
+
+ @Override
+ public int getInt(int columnIndex) {
+ return (int) rowRecord[columnIndex];
+ }
+
+ @Override
+ public long getLong(int columnIndex) {
+ return (long) rowRecord[columnIndex];
+ }
+
+ @Override
+ public float getFloat(int columnIndex) {
+ return (float) rowRecord[columnIndex];
+ }
+
+ @Override
+ public double getDouble(int columnIndex) {
+ return (double) rowRecord[columnIndex];
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) {
+ return (boolean) rowRecord[columnIndex];
+ }
+
+ @Override
+ public Binary getBinary(int columnIndex) {
+ return (Binary) rowRecord[columnIndex];
+ }
+
+ @Override
+ public String getString(int columnIndex) {
+ return ((Binary) rowRecord[columnIndex]).getStringValue();
+ }
+
+ @Override
+ public TSDataType getDataType(int columnIndex) {
+ return dataTypes[columnIndex];
+ }
+
+ @Override
+ public boolean isNull(int columnIndex) {
+ return rowRecord[columnIndex] == null;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ public Row setRowRecord(Object[] rowRecord) {
+ this.rowRecord = rowRecord;
+ return this;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
index 4157f90..486120a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
@@ -22,27 +22,33 @@ package org.apache.iotdb.db.query.udf.core.access;
import org.apache.iotdb.db.query.udf.api.access.Row;
import org.apache.iotdb.db.query.udf.api.access.RowIterator;
import org.apache.iotdb.db.query.udf.api.access.RowWindow;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWindow {
+import java.io.IOException;
+
+public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements RowWindow {
+
+ private final ElasticSerializableRowRecordList rowRecordList;
+ private final TSDataType[] dataTypes;
- private final ElasticSerializableTVList tvList;
private int beginIndex;
private int endIndex;
private int size;
- private final ElasticSerializableTVListBackedSingleColumnRow row;
- private ElasticSerializableTVListBackedSingleColumnWindowIterator rowIterator;
+ private final ElasticSerializableRowRecordListBackedMultiColumnRow row;
+ private ElasticSerializableRowRecordListBackedMultiColumnWindowIterator rowIterator;
+
+ public ElasticSerializableRowRecordListBackedMultiColumnWindow(
+ ElasticSerializableRowRecordList rowRecordList) {
+ this.rowRecordList = rowRecordList;
+ this.dataTypes = rowRecordList.getDataTypes();
- // [beginIndex, endIndex)
- public ElasticSerializableTVListBackedSingleColumnWindow(ElasticSerializableTVList tvList) {
- this.tvList = tvList;
beginIndex = 0;
endIndex = 0;
size = 0;
- row = new ElasticSerializableTVListBackedSingleColumnRow(tvList, beginIndex);
+ row = new ElasticSerializableRowRecordListBackedMultiColumnRow(dataTypes);
}
@Override
@@ -51,25 +57,21 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
}
@Override
- public Row getRow(int rowIndex) {
- if (rowIndex < beginIndex || endIndex <= rowIndex) {
- throw new ArrayIndexOutOfBoundsException(
- String.format("Array index(%d) out of range [%d, %d).", rowIndex, beginIndex, endIndex));
- }
- return row.seek(beginIndex + rowIndex);
+ public Row getRow(int rowIndex) throws IOException {
+ return row.setRowRecord(rowRecordList.getRowRecord(beginIndex + rowIndex));
}
@Override
public TSDataType getDataType(int columnIndex) {
- return tvList.getDataType();
+ return dataTypes[columnIndex];
}
@Override
public RowIterator getRowIterator() {
if (rowIterator == null) {
rowIterator =
- new ElasticSerializableTVListBackedSingleColumnWindowIterator(
- tvList, beginIndex, endIndex);
+ new ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
+ rowRecordList, beginIndex, endIndex);
}
rowIterator.reset();
@@ -81,7 +83,6 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
this.endIndex = endIndex;
size = endIndex - beginIndex;
- row.seek(beginIndex);
rowIterator = null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindowIterator.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindowIterator.java
index 5991d50..801e43f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindowIterator.java
@@ -21,23 +21,27 @@ package org.apache.iotdb.db.query.udf.core.access;
import org.apache.iotdb.db.query.udf.api.access.Row;
import org.apache.iotdb.db.query.udf.api.access.RowIterator;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
import java.io.IOException;
-public class ElasticSerializableTVListBackedSingleColumnWindowIterator implements RowIterator {
+public class ElasticSerializableRowRecordListBackedMultiColumnWindowIterator
+ implements RowIterator {
+ private final ElasticSerializableRowRecordList rowRecordList;
private final int beginIndex;
private final int size;
- private final ElasticSerializableTVListBackedSingleColumnRow row;
+
+ private final ElasticSerializableRowRecordListBackedMultiColumnRow row;
private int rowIndex;
- // [beginIndex, endIndex)
- public ElasticSerializableTVListBackedSingleColumnWindowIterator(
- ElasticSerializableTVList tvList, int beginIndex, int endIndex) {
+ public ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
+ ElasticSerializableRowRecordList rowRecordList, int beginIndex, int endIndex) {
+ this.rowRecordList = rowRecordList;
this.beginIndex = beginIndex;
size = endIndex - beginIndex;
- row = new ElasticSerializableTVListBackedSingleColumnRow(tvList, beginIndex);
+
+ row = new ElasticSerializableRowRecordListBackedMultiColumnRow(rowRecordList.getDataTypes());
rowIndex = -1;
}
@@ -48,7 +52,7 @@ public class ElasticSerializableTVListBackedSingleColumnWindowIterator implement
@Override
public Row next() throws IOException {
- return row.seek(++rowIndex + beginIndex);
+ return row.setRowRecord(rowRecordList.getRowRecord(++rowIndex + beginIndex));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index 4157f90..3296d5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -52,10 +52,6 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
@Override
public Row getRow(int rowIndex) {
- if (rowIndex < beginIndex || endIndex <= rowIndex) {
- throw new ArrayIndexOutOfBoundsException(
- String.format("Array index(%d) out of range [%d, %d).", rowIndex, beginIndex, endIndex));
- }
return row.seek(beginIndex + rowIndex);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
index 5991d50..7dc35d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
@@ -29,6 +29,7 @@ public class ElasticSerializableTVListBackedSingleColumnWindowIterator implement
private final int beginIndex;
private final int size;
+
private final ElasticSerializableTVListBackedSingleColumnRow row;
private int rowIndex;
@@ -37,6 +38,7 @@ public class ElasticSerializableTVListBackedSingleColumnWindowIterator implement
ElasticSerializableTVList tvList, int beginIndex, int endIndex) {
this.beginIndex = beginIndex;
size = endIndex - beginIndex;
+
row = new ElasticSerializableTVListBackedSingleColumnRow(tvList, beginIndex);
rowIndex = -1;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
index 5801eef..d098699 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.db.query.udf.core.layer;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -78,4 +80,25 @@ public class LayerCacheUtils {
return true;
}
+
+ /** @return number of actually collected, which may be less than or equals to rowsNumber */
+ public static int cacheRows(
+ UDFInputDataSet source, ElasticSerializableRowRecordList target, int rowsNumber)
+ throws QueryProcessException, IOException {
+ int count = 0;
+ while (count < rowsNumber && cacheRow(source, target)) {
+ ++count;
+ }
+ return count;
+ }
+
+ public static boolean cacheRow(UDFInputDataSet source, ElasticSerializableRowRecordList target)
+ throws IOException, QueryProcessException {
+ if (source.hasNextRowInObjects()) {
+ target.put(source.nextRowInObjects());
+ return true;
+ } else {
+ return false;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
new file mode 100644
index 0000000..1766ff4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableRowRecordListBackedMultiColumnRow;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableRowRecordListBackedMultiColumnWindow;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiInputColumnIntermediateLayer extends IntermediateLayer
+ implements UDFInputDataSet {
+
+ private final LayerPointReader[] layerPointReaders;
+ private final TSDataType[] dataTypes;
+ private final TimeSelector timeHeap;
+
+ public MultiInputColumnIntermediateLayer(
+ long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders)
+ throws QueryProcessException, IOException {
+ super(queryId, memoryBudgetInMB);
+
+ layerPointReaders = parentLayerPointReaders.toArray(new LayerPointReader[0]);
+
+ dataTypes = new TSDataType[layerPointReaders.length];
+ for (int i = 0; i < layerPointReaders.length; ++i) {
+ dataTypes[i] = layerPointReaders[i].getDataType();
+ }
+
+ timeHeap = new TimeSelector(layerPointReaders.length << 1, true);
+ for (LayerPointReader reader : layerPointReaders) {
+ if (reader.next()) {
+ timeHeap.add(reader.currentTime());
+ }
+ }
+ }
+
+ @Override
+ public List<TSDataType> getDataTypes() {
+ return Arrays.asList(dataTypes);
+ }
+
+ @Override
+ public boolean hasNextRowInObjects() {
+ return !timeHeap.isEmpty();
+ }
+
+ @Override
+ public Object[] nextRowInObjects() throws IOException {
+ long minTime = timeHeap.pollFirst();
+
+ int rowLength = layerPointReaders.length;
+ Object[] row = new Object[rowLength + 1];
+ row[rowLength] = minTime;
+
+ try {
+ for (int i = 0; i < rowLength; ++i) {
+ LayerPointReader reader = layerPointReaders[i];
+ if (!reader.next() || reader.currentTime() != minTime) {
+ continue;
+ }
+
+ switch (reader.getDataType()) {
+ case INT32:
+ row[i] = reader.currentInt();
+ break;
+ case INT64:
+ row[i] = reader.currentLong();
+ break;
+ case FLOAT:
+ row[i] = reader.currentFloat();
+ break;
+ case DOUBLE:
+ row[i] = reader.currentDouble();
+ break;
+ case BOOLEAN:
+ row[i] = reader.currentBoolean();
+ break;
+ case TEXT:
+ row[i] = reader.currentBinary();
+ break;
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type.");
+ }
+ reader.readyForNext();
+
+ if (reader.next()) {
+ timeHeap.add(reader.currentTime());
+ }
+ }
+ } catch (QueryProcessException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ return row;
+ }
+
+ @Override
+ public LayerPointReader constructPointReader() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LayerRowReader constructRowReader() {
+
+ return new LayerRowReader() {
+
+ private final ElasticSerializableRowRecordListBackedMultiColumnRow row =
+ new ElasticSerializableRowRecordListBackedMultiColumnRow(dataTypes);
+
+ private boolean hasCached = false;
+
+ @Override
+ public boolean next() throws IOException {
+ if (hasCached) {
+ return true;
+ }
+
+ if (!hasNextRowInObjects()) {
+ return false;
+ }
+
+ row.setRowRecord(nextRowInObjects());
+ hasCached = true;
+ return true;
+ }
+
+ @Override
+ public void readyForNext() {
+ hasCached = false;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ @Override
+ public long currentTime() {
+ return row.getTime();
+ }
+
+ @Override
+ public Row currentRow() {
+ return row;
+ }
+ };
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+ SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException {
+
+ final UDFInputDataSet udfInputDataSet = this;
+
+ return new LayerRowWindowReader() {
+
+ private final int windowSize = strategy.getWindowSize();
+ private final int slidingStep = strategy.getSlidingStep();
+
+ private final ElasticSerializableRowRecordList rowRecordList =
+ new ElasticSerializableRowRecordList(dataTypes, queryId, memoryBudgetInMB, 2);
+ private final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+ new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+ private boolean hasCached = false;
+ private int beginIndex = -slidingStep;
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ if (hasCached) {
+ return true;
+ }
+
+ beginIndex += slidingStep;
+ int endIndex = beginIndex + windowSize;
+
+ int rowsToBeCollected = endIndex - rowRecordList.size();
+ if (0 < rowsToBeCollected) {
+ hasCached =
+ LayerCacheUtils.cacheRows(udfInputDataSet, rowRecordList, rowsToBeCollected) != 0;
+ window.seek(beginIndex, rowRecordList.size());
+ } else {
+ hasCached = true;
+ window.seek(beginIndex, endIndex);
+ }
+
+ return hasCached;
+ }
+
+ @Override
+ public void readyForNext() {
+ hasCached = false;
+
+ rowRecordList.setEvictionUpperBound(beginIndex + 1);
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
+
+ @Override
+ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException, IOException {
+
+ final long timeInterval = strategy.getTimeInterval();
+ final long slidingStep = strategy.getSlidingStep();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+ final UDFInputDataSet udfInputDataSet = this;
+ final ElasticSerializableRowRecordList rowRecordList =
+ new ElasticSerializableRowRecordList(dataTypes, queryId, memoryBudgetInMB, 2);
+ final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+ new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+ long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+ if (rowRecordList.size() == 0
+ && LayerCacheUtils.cacheRow(udfInputDataSet, rowRecordList)
+ && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+ // display window begin should be set to the same as the min timestamp of the query result
+ // set
+ nextWindowTimeBeginGivenByStrategy = rowRecordList.getTime(0);
+ }
+ long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
+
+ final boolean hasAtLeastOneRow = rowRecordList.size() != 0;
+
+ return new LayerRowWindowReader() {
+
+ private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy;
+ private int nextIndexBegin = 0;
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ if (displayWindowEnd <= nextWindowTimeBegin) {
+ return false;
+ }
+ if (!hasAtLeastOneRow || 0 < rowRecordList.size()) {
+ return true;
+ }
+
+ long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
+ int oldTVListSize = rowRecordList.size();
+ while (rowRecordList.getTime(rowRecordList.size() - 1) < nextWindowTimeEnd) {
+ if (!LayerCacheUtils.cacheRow(udfInputDataSet, rowRecordList)) {
+ if (displayWindowEnd == Long.MAX_VALUE
+ // display window end == the max timestamp of the query result set
+ && oldTVListSize == rowRecordList.size()) {
+ return false;
+ } else {
+ break;
+ }
+ }
+ }
+
+ for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+ if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ if (i == rowRecordList.size() - 1) {
+ nextIndexBegin = rowRecordList.size();
+ }
+ }
+
+ int nextIndexEnd = rowRecordList.size();
+ for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+ if (nextWindowTimeEnd <= rowRecordList.getTime(i)) {
+ nextIndexEnd = i;
+ break;
+ }
+ }
+ window.seek(nextIndexBegin, nextIndexEnd);
+
+ return true;
+ }
+
+ @Override
+ public void readyForNext() {
+ nextWindowTimeBegin += slidingStep;
+
+ rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
deleted file mode 100644
index 861fd7d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.layer;
-
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
-
-import java.util.List;
-
-public class MultiInputColumnMultiReferenceIntermediateLayer extends IntermediateLayer {
-
- public MultiInputColumnMultiReferenceIntermediateLayer(
- long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
- super(queryId, memoryBudgetInMB);
- }
-
- @Override
- public LayerPointReader constructPointReader() {
- return null;
- }
-
- @Override
- public LayerRowReader constructRowReader() {
- return null;
- }
-
- @Override
- protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
- SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
- return null;
- }
-
- @Override
- protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
- SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
- return null;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
deleted file mode 100644
index 9ef769d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.layer;
-
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
-
-import java.util.List;
-
-public class MultiInputColumnSingleReferenceIntermediateLayer extends IntermediateLayer {
-
- public MultiInputColumnSingleReferenceIntermediateLayer(
- long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
- super(queryId, memoryBudgetInMB);
- }
-
- @Override
- public LayerPointReader constructPointReader() {
- return null;
- }
-
- @Override
- public LayerRowReader constructRowReader() {
- return null;
- }
-
- @Override
- protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
- SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
- return null;
- }
-
- @Override
- protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
- SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
- return null;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java b/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java
index 910c842..7171ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java
@@ -116,6 +116,10 @@ public class ElasticSerializableRowRecordList {
return size;
}
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
public long getTime(int index) throws IOException {
return cache
.get(index / internalRowRecordListCapacity)
@@ -138,7 +142,8 @@ public class ElasticSerializableRowRecordList {
(long) indexListOfTextFields.length * byteArrayLengthForMemoryControl;
if (rowRecord == null) {
- totalByteArrayLength += indexListOfTextFields.length * byteArrayLengthForMemoryControl;
+ totalByteArrayLength +=
+ (long) indexListOfTextFields.length * byteArrayLengthForMemoryControl;
} else {
for (int indexListOfTextField : indexListOfTextFields) {
Binary binary = (Binary) rowRecord[indexListOfTextField];
@@ -163,7 +168,7 @@ public class ElasticSerializableRowRecordList {
}
int newByteArrayLengthForMemoryControl = byteArrayLengthForMemoryControl;
- while (newByteArrayLengthForMemoryControl * size < totalByteArrayLength) {
+ while ((long) newByteArrayLengthForMemoryControl * size < totalByteArrayLength) {
newByteArrayLengthForMemoryControl *= 2;
}
int newInternalTVListCapacity =