You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/02 08:13:40 UTC
[iotdb] 03/04: introduce MultiInputIntermediateLayer
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de1c509c853145f8217f7ef6afee03fd9c21bfdd
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 1 18:50:33 2021 +0800
introduce MultiInputIntermediateLayer
---
.../query/expression/binary/BinaryExpression.java | 3 +-
.../query/expression/unary/FunctionExpression.java | 11 +-
.../query/expression/unary/NegationExpression.java | 3 +-
.../query/expression/unary/TimeSeriesOperand.java | 3 +-
.../db/query/udf/core/layer/IntermediateLayer.java | 141 +--------------------
.../core/layer/MultiInputIntermediateLayer.java | 35 +++++
...ayer.java => SingleInputIntermediateLayer.java} | 5 +-
7 files changed, 55 insertions(+), 146 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4d52793..69dd13c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
@@ -125,7 +126,7 @@ public abstract class BinaryExpression extends Expression {
expressionIntermediateLayerMap.put(
this,
- new IntermediateLayer(
+ new SingleInputIntermediateLayer(
constructTransformer(
leftParentIntermediateLayer.constructPointReader(),
rightParentIntermediateLayer.constructPointReader()),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 4773ae4..7f55995 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import java.util.ArrayList;
import java.util.Iterator;
@@ -153,11 +155,16 @@ public class FunctionExpression extends Expression {
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
+ List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
for (Expression expression : expressions) {
- expression.constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap);
+ parentLayerPointReaders.add(
+ expression
+ .constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap)
+ .constructPointReader());
}
- // todo!
+ expressionIntermediateLayerMap.put(
+ this, new MultiInputIntermediateLayer(parentLayerPointReaders, -1, -1));
}
return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 355eca0..813ee5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
import java.util.ArrayList;
@@ -88,7 +89,7 @@ public class NegationExpression extends Expression {
expressionIntermediateLayerMap.put(
this,
- new IntermediateLayer(
+ new SingleInputIntermediateLayer(
new ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
-1,
-1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 208a760..c1e6eb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.List;
@@ -79,7 +80,7 @@ public class TimeSeriesOperand extends Expression {
if (!expressionIntermediateLayerMap.containsKey(this)) {
expressionIntermediateLayerMap.put(
this,
- new IntermediateLayer(
+ new SingleInputIntermediateLayer(
inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
-1,
-1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index f4ccd03..cbac211 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -19,146 +19,9 @@
package org.apache.iotdb.db.query.udf.core.layer;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import java.io.IOException;
+public interface IntermediateLayer {
-public class IntermediateLayer {
-
- private static final int CACHE_BLOCK_SIZE = 1;
-
- private final TSDataType dataType;
- private final LayerPointReader parentLayerPointReader;
- private final ElasticSerializableTVList tvList;
- private final SafetyLine safetyLine;
-
- public IntermediateLayer(
- LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
- throws QueryProcessException {
- this.parentLayerPointReader = parentLayerPointReader;
- dataType = parentLayerPointReader.getDataType();
- tvList =
- ElasticSerializableTVList.newElasticSerializableTVList(
- dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
- safetyLine = new SafetyLine();
- }
-
- public LayerPointReader constructPointReader() {
-
- return new LayerPointReader() {
-
- private final SafetyPile safetyPile = safetyLine.addSafetyPile();
-
- private boolean hasCached = false;
- private int currentPointIndex = -1;
-
- @Override
- public boolean next() throws QueryProcessException, IOException {
- if (hasCached) {
- return true;
- }
-
- if (currentPointIndex < tvList.size() - 1) {
- ++currentPointIndex;
- hasCached = true;
- }
-
- // tvList.size() - 1 <= currentPointIndex
- if (!hasCached && parentLayerPointReader.next()) {
- cachePoint();
- parentLayerPointReader.readyForNext();
-
- ++currentPointIndex;
- hasCached = true;
- }
-
- return hasCached;
- }
-
- private void cachePoint() throws IOException, QueryProcessException {
- switch (dataType) {
- case INT32:
- tvList.putInt(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
- break;
- case INT64:
- tvList.putLong(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
- break;
- case FLOAT:
- tvList.putFloat(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
- break;
- case DOUBLE:
- tvList.putDouble(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
- break;
- case BOOLEAN:
- tvList.putBoolean(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
- break;
- case TEXT:
- tvList.putBinary(
- parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
- break;
- default:
- throw new UnsupportedOperationException(dataType.name());
- }
- }
-
- @Override
- public void readyForNext() {
- hasCached = false;
-
- safetyPile.moveForwardTo(currentPointIndex + 1);
- // todo: reduce the update rate
- tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
- }
-
- @Override
- public TSDataType getDataType() {
- return dataType;
- }
-
- @Override
- public long currentTime() throws IOException {
- return tvList.getTime(currentPointIndex);
- }
-
- @Override
- public int currentInt() throws IOException {
- return tvList.getInt(currentPointIndex);
- }
-
- @Override
- public long currentLong() throws IOException {
- return tvList.getLong(currentPointIndex);
- }
-
- @Override
- public float currentFloat() throws IOException {
- return tvList.getFloat(currentPointIndex);
- }
-
- @Override
- public double currentDouble() throws IOException {
- return tvList.getDouble(currentPointIndex);
- }
-
- @Override
- public boolean currentBoolean() throws IOException {
- return tvList.getBoolean(currentPointIndex);
- }
-
- @Override
- public Binary currentBinary() throws IOException {
- return tvList.getBinary(currentPointIndex);
- }
- };
- }
+ LayerPointReader constructPointReader();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java
new file mode 100644
index 0000000..260c308
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+import java.util.List;
+
+public class MultiInputIntermediateLayer implements IntermediateLayer {
+
+ public MultiInputIntermediateLayer(
+ List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+
+ @Override
+ public LayerPointReader constructPointReader() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
similarity index 97%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
index f4ccd03..04697c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
-public class IntermediateLayer {
+public class SingleInputIntermediateLayer implements IntermediateLayer {
private static final int CACHE_BLOCK_SIZE = 1;
@@ -37,7 +37,7 @@ public class IntermediateLayer {
private final ElasticSerializableTVList tvList;
private final SafetyLine safetyLine;
- public IntermediateLayer(
+ public SingleInputIntermediateLayer(
LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
throws QueryProcessException {
this.parentLayerPointReader = parentLayerPointReader;
@@ -48,6 +48,7 @@ public class IntermediateLayer {
safetyLine = new SafetyLine();
}
+ @Override
public LayerPointReader constructPointReader() {
return new LayerPointReader() {