You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/09 07:18:42 UTC
[iotdb] branch nested-operations updated: clean RawQueryInputLayer
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/nested-operations by this push:
new 549f2e1 clean RawQueryInputLayer
549f2e1 is described below
commit 549f2e1da68e1602aa5450840ea4ea79a2fd6c26
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Sep 9 15:18:02 2021 +0800
clean RawQueryInputLayer
---
.../db/query/dataset/UDTFAlignByTimeDataSet.java | 4 +-
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 14 +-
.../db/query/dataset/UDTFNonAlignDataSet.java | 2 +-
.../iotdb/db/query/expression/Expression.java | 4 +-
.../query/expression/binary/BinaryExpression.java | 4 +-
.../query/expression/unary/FunctionExpression.java | 6 +-
.../query/expression/unary/NegationExpression.java | 4 +-
.../query/expression/unary/TimeSeriesOperand.java | 4 +-
.../iotdb/db/query/udf/core/layer/DAGBuilder.java | 5 +-
.../query/udf/core/layer/RawQueryInputLayer.java | 197 +++++++
.../iotdb/db/query/udf/core/layer/UDFLayer.java | 575 ---------------------
11 files changed, 221 insertions(+), 598 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index c7e8af4..beec5d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -193,7 +193,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
// todo: control upper bound here
- udfLayer.updateRowRecordListEvictionUpperBound();
+ rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
}
/*
@@ -296,7 +296,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
// todo: control upper bound here
- udfLayer.updateRowRecordListEvictionUpperBound();
+ rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
return rowRecord;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 3f69f62..f58a8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -48,7 +48,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
protected final long queryId;
protected final UDTFPlan udtfPlan;
- protected final UDFLayer udfLayer;
+ protected final RawQueryInputLayer rawQueryInputLayer;
protected LayerPointReader[] transformers;
@@ -65,8 +65,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
queryId = queryContext.getQueryId();
this.udtfPlan = udtfPlan;
- udfLayer =
- new UDFLayer(
+ rawQueryInputLayer =
+ new RawQueryInputLayer(
queryId,
UDF_READER_MEMORY_BUDGET_IN_MB,
deduplicatedPaths,
@@ -89,8 +89,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
queryId = queryContext.getQueryId();
this.udtfPlan = udtfPlan;
- udfLayer =
- new UDFLayer(
+ rawQueryInputLayer =
+ new RawQueryInputLayer(
queryId,
UDF_READER_MEMORY_BUDGET_IN_MB,
deduplicatedPaths,
@@ -102,7 +102,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
protected void initTransformers() throws QueryProcessException, IOException {
transformers =
- new DAGBuilder(queryId, udtfPlan, udfLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
+ new DAGBuilder(queryId, udtfPlan, rawQueryInputLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
.buildLayerMemoryAssigner()
.buildResultColumnPointReaders()
.getResultColumnPointReaders();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index e3810bb..05f954c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -114,7 +114,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
}
// todo: control upper bound here
- udfLayer.updateRowRecordListEvictionUpperBound();
+ rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
tsQueryNonAlignDataSet.setTimeList(timeBufferList);
tsQueryNonAlignDataSet.setValueList(valueBufferList);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 738cbea..cea4875 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import java.io.IOException;
import java.time.ZoneId;
@@ -63,7 +63,7 @@ public abstract class Expression {
public abstract IntermediateLayer constructIntermediateLayer(
long queryId,
UDTFPlan udtfPlan,
- UDFLayer rawTimeSeriesInputLayer,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4b76daa..99cfffc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
@@ -134,7 +134,7 @@ public abstract class BinaryExpression extends Expression {
public IntermediateLayer constructIntermediateLayer(
long queryId,
UDTFPlan udtfPlan,
- UDFLayer rawTimeSeriesInputLayer,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 4fa6bbb..adb703b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -32,9 +32,9 @@ import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
@@ -185,7 +185,7 @@ public class FunctionExpression extends Expression {
public IntermediateLayer constructIntermediateLayer(
long queryId,
UDTFPlan udtfPlan,
- UDFLayer rawTimeSeriesInputLayer,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
@@ -217,7 +217,7 @@ public class FunctionExpression extends Expression {
private IntermediateLayer constructUdfInputIntermediateLayer(
long queryId,
UDTFPlan udtfPlan,
- UDFLayer rawTimeSeriesInputLayer,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index a85b386..0e33cd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
@@ -98,7 +98,7 @@ public class NegationExpression extends Expression {
public IntermediateLayer constructIntermediateLayer(
long queryId,
UDTFPlan udtfPlan,
- UDFLayer rawTimeSeriesInputLayer,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index ec33153..55a79b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -91,7 +91,7 @@ public class TimeSeriesOperand extends Expression {
public IntermediateLayer constructIntermediateLayer(
long queryId,
UDTFPlan udtfPlan,
- UDFLayer rawTimeSeriesInputLayer,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index e03ab9a..1a33c17 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -32,7 +32,7 @@ public class DAGBuilder {
private final long queryId;
private final UDTFPlan udtfPlan;
- private final UDFLayer rawTimeSeriesInputLayer;
+ private final RawQueryInputLayer rawTimeSeriesInputLayer;
// input
private final Expression[] resultColumnExpressions;
@@ -47,7 +47,8 @@ public class DAGBuilder {
// make sure that only one point reader will be built for one expression.
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
- public DAGBuilder(long queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
+ public DAGBuilder(
+ long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
this.queryId = queryId;
this.udtfPlan = udtfPlan;
this.rawTimeSeriesInputLayer = inputLayer;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
new file mode 100644
index 0000000..b67923c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
+import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.io.IOException;
+import java.util.List;
+
+public class RawQueryInputLayer {
+
+ private UDFInputDataSet queryDataSet;
+ private TSDataType[] dataTypes;
+ private int timestampIndex;
+
+ private ElasticSerializableRowRecordList rowRecordList;
+ private SafetyLine safetyLine;
+
+ /** InputLayerWithoutValueFilter */
+ public RawQueryInputLayer(
+ long queryId,
+ float memoryBudgetInMB,
+ List<PartialPath> paths,
+ List<TSDataType> dataTypes,
+ List<ManagedSeriesReader> readers)
+ throws QueryProcessException, IOException, InterruptedException {
+ construct(
+ queryId,
+ memoryBudgetInMB,
+ new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
+ }
+
+ /** InputLayerWithValueFilter */
+ public RawQueryInputLayer(
+ long queryId,
+ float memoryBudgetInMB,
+ List<PartialPath> paths,
+ List<TSDataType> dataTypes,
+ TimeGenerator timeGenerator,
+ List<IReaderByTimestamp> readers,
+ List<Boolean> cached)
+ throws QueryProcessException {
+ construct(
+ queryId,
+ memoryBudgetInMB,
+ new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
+ }
+
+ private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+ throws QueryProcessException {
+ this.queryDataSet = queryDataSet;
+ dataTypes = queryDataSet.getDataTypes().toArray(new TSDataType[0]);
+ timestampIndex = dataTypes.length;
+ rowRecordList =
+ new ElasticSerializableRowRecordList(
+ dataTypes, queryId, memoryBudgetInMB, 1 + dataTypes.length / 2);
+ safetyLine = new SafetyLine();
+ }
+
+ public void updateRowRecordListEvictionUpperBound() {
+ rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+ }
+
+ public LayerPointReader constructPointReader(int columnIndex) {
+ return new InputLayerPointReader(columnIndex);
+ }
+
+ private class InputLayerPointReader implements LayerPointReader {
+
+ private final SafetyPile safetyPile;
+
+ private final int columnIndex;
+ private int currentRowIndex;
+
+ private boolean hasCachedRowRecord;
+ private Object[] cachedRowRecord;
+
+ InputLayerPointReader(int columnIndex) {
+ safetyPile = safetyLine.addSafetyPile();
+
+ this.columnIndex = columnIndex;
+ currentRowIndex = -1;
+
+ hasCachedRowRecord = false;
+ cachedRowRecord = null;
+ }
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ if (hasCachedRowRecord) {
+ return true;
+ }
+
+ for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
+ Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
+ if (rowRecordCandidate[columnIndex] != null) {
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordCandidate;
+ currentRowIndex = i;
+ break;
+ }
+ }
+
+ if (!hasCachedRowRecord) {
+ while (queryDataSet.hasNextRowInObjects()) {
+ Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+ rowRecordList.put(rowRecordCandidate);
+ if (rowRecordCandidate[columnIndex] != null) {
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordCandidate;
+ currentRowIndex = rowRecordList.size() - 1;
+ break;
+ }
+ }
+ }
+
+ return hasCachedRowRecord;
+ }
+
+ @Override
+ public void readyForNext() {
+ hasCachedRowRecord = false;
+ cachedRowRecord = null;
+
+ safetyPile.moveForwardTo(currentRowIndex + 1);
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return dataTypes[columnIndex];
+ }
+
+ @Override
+ public long currentTime() {
+ return (long) cachedRowRecord[timestampIndex];
+ }
+
+ @Override
+ public int currentInt() {
+ return (int) cachedRowRecord[columnIndex];
+ }
+
+ @Override
+ public long currentLong() {
+ return (long) cachedRowRecord[columnIndex];
+ }
+
+ @Override
+ public float currentFloat() {
+ return (float) cachedRowRecord[columnIndex];
+ }
+
+ @Override
+ public double currentDouble() {
+ return (double) cachedRowRecord[columnIndex];
+ }
+
+ @Override
+ public boolean currentBoolean() {
+ return (boolean) cachedRowRecord[columnIndex];
+ }
+
+ @Override
+ public Binary currentBinary() {
+ return (Binary) cachedRowRecord[columnIndex];
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
deleted file mode 100644
index 3148dd4..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.layer;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter;
-import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
-import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
-import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
-import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.udf.api.access.Row;
-import org.apache.iotdb.db.query.udf.api.access.RowWindow;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.access.MultiColumnRow;
-import org.apache.iotdb.db.query.udf.core.access.MultiColumnWindow;
-import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
-import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.ElasticSerializableIntList;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.IntList;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.SerializableIntList;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.WrappedIntArray;
-import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import java.io.IOException;
-import java.util.List;
-
-public class UDFLayer {
-
- private long queryId;
-
- private UDFInputDataSet queryDataSet;
- private TSDataType[] dataTypes;
- private int timestampIndex;
-
- private ElasticSerializableRowRecordList rowRecordList;
- private SafetyLine safetyLine;
-
- /** InputLayerWithoutValueFilter */
- public UDFLayer(
- long queryId,
- float memoryBudgetInMB,
- List<PartialPath> paths,
- List<TSDataType> dataTypes,
- List<ManagedSeriesReader> readers)
- throws QueryProcessException, IOException, InterruptedException {
- construct(
- queryId,
- memoryBudgetInMB,
- new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
- }
-
- /** InputLayerWithValueFilter */
- public UDFLayer(
- long queryId,
- float memoryBudgetInMB,
- List<PartialPath> paths,
- List<TSDataType> dataTypes,
- TimeGenerator timeGenerator,
- List<IReaderByTimestamp> readers,
- List<Boolean> cached)
- throws QueryProcessException {
- construct(
- queryId,
- memoryBudgetInMB,
- new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
- }
-
- public UDFLayer(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
- throws QueryProcessException {
- construct(queryId, memoryBudgetInMB, queryDataSet);
- }
-
- private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
- throws QueryProcessException {
- this.queryId = queryId;
- this.queryDataSet = queryDataSet;
- dataTypes = queryDataSet.getDataTypes().toArray(new TSDataType[0]);
- timestampIndex = dataTypes.length;
- rowRecordList =
- new ElasticSerializableRowRecordList(
- dataTypes, queryId, memoryBudgetInMB, 1 + dataTypes.length / 2);
- safetyLine = new SafetyLine();
- }
-
- public void updateRowRecordListEvictionUpperBound() {
- rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
- }
-
- public LayerPointReader constructPointReader(int columnIndex) {
- return new InputLayerPointReader(columnIndex);
- }
-
- public LayerRowReader constructRowReader(int[] columnIndexes) {
- return new InputLayerRowReader(columnIndexes);
- }
-
- public LayerRowWindowReader constructRowWindowReader(
- int[] columnIndexes, AccessStrategy strategy, float memoryBudgetInMB)
- throws QueryProcessException, IOException {
- switch (strategy.getAccessStrategyType()) {
- case SLIDING_TIME_WINDOW:
- return new InputLayerRowSlidingTimeWindowReader(
- columnIndexes, (SlidingTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
- case SLIDING_SIZE_WINDOW:
- return new InputLayerRowSlidingSizeWindowReader(
- columnIndexes, (SlidingSizeWindowAccessStrategy) strategy, memoryBudgetInMB);
- default:
- throw new IllegalStateException(
- "Unexpected access strategy: " + strategy.getAccessStrategyType());
- }
- }
-
- private class InputLayerPointReader implements LayerPointReader {
-
- private final SafetyPile safetyPile;
-
- private final int columnIndex;
- private int currentRowIndex;
-
- private boolean hasCachedRowRecord;
- private Object[] cachedRowRecord;
-
- InputLayerPointReader(int columnIndex) {
- safetyPile = safetyLine.addSafetyPile();
-
- this.columnIndex = columnIndex;
- currentRowIndex = -1;
-
- hasCachedRowRecord = false;
- cachedRowRecord = null;
- }
-
- @Override
- public boolean next() throws IOException, QueryProcessException {
- if (hasCachedRowRecord) {
- return true;
- }
-
- for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
- Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
- if (rowRecordCandidate[columnIndex] != null) {
- hasCachedRowRecord = true;
- cachedRowRecord = rowRecordCandidate;
- currentRowIndex = i;
- break;
- }
- }
-
- if (!hasCachedRowRecord) {
- while (queryDataSet.hasNextRowInObjects()) {
- Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
- rowRecordList.put(rowRecordCandidate);
- if (rowRecordCandidate[columnIndex] != null) {
- hasCachedRowRecord = true;
- cachedRowRecord = rowRecordCandidate;
- currentRowIndex = rowRecordList.size() - 1;
- break;
- }
- }
- }
-
- return hasCachedRowRecord;
- }
-
- @Override
- public void readyForNext() {
- hasCachedRowRecord = false;
- cachedRowRecord = null;
-
- safetyPile.moveForwardTo(currentRowIndex + 1);
- }
-
- @Override
- public TSDataType getDataType() {
- return dataTypes[columnIndex];
- }
-
- @Override
- public long currentTime() {
- return (long) cachedRowRecord[timestampIndex];
- }
-
- @Override
- public int currentInt() {
- return (int) cachedRowRecord[columnIndex];
- }
-
- @Override
- public long currentLong() {
- return (long) cachedRowRecord[columnIndex];
- }
-
- @Override
- public float currentFloat() {
- return (float) cachedRowRecord[columnIndex];
- }
-
- @Override
- public double currentDouble() {
- return (double) cachedRowRecord[columnIndex];
- }
-
- @Override
- public boolean currentBoolean() {
- return (boolean) cachedRowRecord[columnIndex];
- }
-
- @Override
- public Binary currentBinary() {
- return (Binary) cachedRowRecord[columnIndex];
- }
- }
-
- private class InputLayerRowReader implements LayerRowReader {
-
- private final SafetyPile safetyPile;
-
- private final int[] columnIndexes;
- private int currentRowIndex;
-
- private boolean hasCachedRowRecord;
- private Object[] cachedRowRecord;
-
- private final MultiColumnRow row;
-
- public InputLayerRowReader(int[] columnIndexes) {
- safetyPile = safetyLine.addSafetyPile();
-
- this.columnIndexes = columnIndexes;
- currentRowIndex = -1;
-
- hasCachedRowRecord = false;
- cachedRowRecord = null;
-
- row = new MultiColumnRow(columnIndexes, dataTypes);
- }
-
- @Override
- public boolean next() throws IOException, QueryProcessException {
- if (hasCachedRowRecord) {
- return true;
- }
-
- for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
- Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
- if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
- hasCachedRowRecord = true;
- cachedRowRecord = rowRecordCandidate;
- currentRowIndex = i;
- break;
- }
- }
-
- if (!hasCachedRowRecord) {
- while (queryDataSet.hasNextRowInObjects()) {
- Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
- rowRecordList.put(rowRecordCandidate);
- if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
- hasCachedRowRecord = true;
- cachedRowRecord = rowRecordCandidate;
- currentRowIndex = rowRecordList.size() - 1;
- break;
- }
- }
- }
-
- return hasCachedRowRecord;
- }
-
- @Override
- public void readyForNext() {
- hasCachedRowRecord = false;
- cachedRowRecord = null;
-
- safetyPile.moveForwardTo(currentRowIndex + 1);
- }
-
- @Override
- public TSDataType[] getDataTypes() {
- return dataTypes;
- }
-
- @Override
- public long currentTime() {
- return (long) cachedRowRecord[timestampIndex];
- }
-
- @Override
- public Row currentRow() {
- return row.setRowRecord(cachedRowRecord);
- }
- }
-
- private class InputLayerRowSlidingSizeWindowReader implements LayerRowWindowReader {
-
- private final SafetyPile safetyPile;
-
- private final int[] columnIndexes;
- private final TSDataType[] columnDataTypes;
-
- private final int windowSize;
- private final IntList rowIndexes;
- private final MultiColumnWindow rowWindow;
-
- private final int slidingStep;
-
- private int maxIndexInLastWindow;
-
- private InputLayerRowSlidingSizeWindowReader(
- int[] columnIndexes, SlidingSizeWindowAccessStrategy accessStrategy, float memoryBudgetInMB)
- throws QueryProcessException {
- safetyPile = safetyLine.addSafetyPile();
-
- this.columnIndexes = columnIndexes;
- columnDataTypes = new TSDataType[columnIndexes.length];
- for (int i = 0; i < columnIndexes.length; ++i) {
- columnDataTypes[i] = dataTypes[columnIndexes[i]];
- }
-
- windowSize = accessStrategy.getWindowSize();
- rowIndexes =
- windowSize < SerializableIntList.calculateCapacity(memoryBudgetInMB)
- ? new WrappedIntArray(windowSize)
- : new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
- rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
-
- slidingStep = accessStrategy.getSlidingStep();
-
- maxIndexInLastWindow = -1;
- }
-
- @Override
- public boolean next() throws IOException, QueryProcessException {
- if (0 < rowIndexes.size()) {
- return true;
- }
-
- int count = 0;
-
- for (int i = maxIndexInLastWindow + 1; i < rowRecordList.size(); ++i) {
- if (hasNotNullSelectedFields(rowRecordList.getRowRecord(i), columnIndexes)) {
- rowIndexes.put(i);
- ++count;
- if (count == windowSize) {
- return true;
- }
- }
- }
-
- while (queryDataSet.hasNextRowInObjects()) {
- Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
- rowRecordList.put(rowRecordCandidate);
- if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
- rowIndexes.put(rowRecordList.size() - 1);
- ++count;
- if (count == windowSize) {
- return true;
- }
- }
- }
-
- return count != 0;
- }
-
- @Override
- public void readyForNext() throws IOException, QueryProcessException {
- updateMaxIndexForLastWindow();
-
- safetyPile.moveForwardTo(maxIndexInLastWindow + 1);
-
- rowIndexes.clear();
- }
-
- private void updateMaxIndexForLastWindow() throws IOException, QueryProcessException {
- if (rowIndexes.size() == 0) {
- return;
- }
-
- if (slidingStep <= rowIndexes.size()) {
- maxIndexInLastWindow = rowIndexes.get(slidingStep - 1);
- return;
- }
-
- int currentStep = rowIndexes.size() - 1;
-
- for (int i = rowIndexes.get(rowIndexes.size() - 1) + 1; i < rowRecordList.size(); ++i) {
- if (hasNotNullSelectedFields(rowRecordList.getRowRecord(i), columnIndexes)) {
- ++currentStep;
- if (currentStep == slidingStep) {
- maxIndexInLastWindow = i - 1;
- return;
- }
- }
- }
-
- while (queryDataSet.hasNextRowInObjects()) {
- Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
- rowRecordList.put(rowRecordCandidate);
- if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
- ++currentStep;
- if (currentStep == slidingStep) {
- maxIndexInLastWindow = rowRecordList.size() - 2;
- return;
- }
- }
- }
-
- maxIndexInLastWindow = rowRecordList.size() - 1;
- }
-
- @Override
- public TSDataType[] getDataTypes() {
- return columnDataTypes;
- }
-
- @Override
- public RowWindow currentWindow() {
- return rowWindow;
- }
- }
-
- private class InputLayerRowSlidingTimeWindowReader implements LayerRowWindowReader {
-
- private final SafetyPile safetyPile;
-
- private final int[] columnIndexes;
- private final TSDataType[] columnDataTypes;
-
- private final long timeInterval;
- private final long slidingStep;
- private final long displayWindowEnd;
-
- private final IntList rowIndexes;
- private final MultiColumnWindow rowWindow;
-
- private long nextWindowTimeBegin;
- private int nextIndexBegin;
-
- private final boolean hasAtLeastOneRow;
-
- private InputLayerRowSlidingTimeWindowReader(
- int[] columnIndexes, SlidingTimeWindowAccessStrategy accessStrategy, float memoryBudgetInMB)
- throws QueryProcessException, IOException {
- safetyPile = safetyLine.addSafetyPile();
-
- this.columnIndexes = columnIndexes;
- columnDataTypes = new TSDataType[columnIndexes.length];
- for (int i = 0; i < columnIndexes.length; ++i) {
- columnDataTypes[i] = dataTypes[columnIndexes[i]];
- }
-
- timeInterval = accessStrategy.getTimeInterval();
- slidingStep = accessStrategy.getSlidingStep();
- displayWindowEnd = accessStrategy.getDisplayWindowEnd();
-
- rowIndexes = new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
- rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
-
- nextWindowTimeBegin = accessStrategy.getDisplayWindowBegin();
- nextIndexBegin = 0;
-
- if (rowRecordList.size() == 0 && queryDataSet.hasNextRowInObjects()) {
- rowRecordList.put(queryDataSet.nextRowInObjects());
-
- if (nextWindowTimeBegin == Long.MIN_VALUE) {
- // display window begin should be set to the same as the min timestamp of the query result
- // set
- nextWindowTimeBegin = rowRecordList.getTime(0);
- }
- }
- hasAtLeastOneRow = rowRecordList.size() != 0;
- }
-
- @Override
- public boolean next() throws IOException, QueryProcessException {
- if (displayWindowEnd <= nextWindowTimeBegin) {
- return false;
- }
- if (!hasAtLeastOneRow || 0 < rowIndexes.size()) {
- return true;
- }
-
- long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
- int oldRowRecordListSize = rowRecordList.size();
- while ((Long) rowRecordList.getRowRecord(rowRecordList.size() - 1)[timestampIndex]
- < nextWindowTimeEnd) {
- if (queryDataSet.hasNextRowInObjects()) {
- rowRecordList.put(queryDataSet.nextRowInObjects());
- } else if (displayWindowEnd == Long.MAX_VALUE
- // display window end == the max timestamp of the query result set
- && oldRowRecordListSize == rowRecordList.size()) {
- return false;
- } else {
- break;
- }
- }
-
- for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
- if (nextWindowTimeBegin <= (Long) rowRecordList.getRowRecord(i)[timestampIndex]) {
- nextIndexBegin = i;
- break;
- }
- if (i == rowRecordList.size() - 1) {
- nextIndexBegin = rowRecordList.size();
- }
- }
-
- for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
- Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
- if (nextWindowTimeEnd <= (Long) rowRecordCandidate[timestampIndex]) {
- break;
- }
- if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
- rowIndexes.put(i);
- }
- }
-
- return true;
- }
-
- @Override
- public void readyForNext() {
- nextWindowTimeBegin += slidingStep;
-
- safetyPile.moveForwardTo(nextIndexBegin);
-
- rowIndexes.clear();
- }
-
- @Override
- public TSDataType[] getDataTypes() {
- return columnDataTypes;
- }
-
- @Override
- public RowWindow currentWindow() {
- return rowWindow;
- }
- }
-
- private static boolean hasNotNullSelectedFields(
- Object[] rowRecordCandidate, int[] columnIndexes) {
- for (int columnIndex : columnIndexes) {
- if (rowRecordCandidate[columnIndex] != null) {
- return true;
- }
- }
- return false;
- }
-}