You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/02 08:13:40 UTC

[iotdb] 03/04: introduce MultiInputIntermediateLayer

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de1c509c853145f8217f7ef6afee03fd9c21bfdd
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 1 18:50:33 2021 +0800

    introduce MultiInputIntermediateLayer
---
 .../query/expression/binary/BinaryExpression.java  |   3 +-
 .../query/expression/unary/FunctionExpression.java |  11 +-
 .../query/expression/unary/NegationExpression.java |   3 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   3 +-
 .../db/query/udf/core/layer/IntermediateLayer.java | 141 +--------------------
 .../core/layer/MultiInputIntermediateLayer.java    |  35 +++++
 ...ayer.java => SingleInputIntermediateLayer.java} |   5 +-
 7 files changed, 55 insertions(+), 146 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4d52793..69dd13c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 
@@ -125,7 +126,7 @@ public abstract class BinaryExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new IntermediateLayer(
+          new SingleInputIntermediateLayer(
               constructTransformer(
                   leftParentIntermediateLayer.constructPointReader(),
                   rightParentIntermediateLayer.constructPointReader()),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 4773ae4..7f55995 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -153,11 +155,16 @@ public class FunctionExpression extends Expression {
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
+      List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
       for (Expression expression : expressions) {
-        expression.constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap);
+        parentLayerPointReaders.add(
+            expression
+                .constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap)
+                .constructPointReader());
       }
 
-      // todo!
+      expressionIntermediateLayerMap.put(
+          this, new MultiInputIntermediateLayer(parentLayerPointReaders, -1, -1));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 355eca0..813ee5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 
 import java.util.ArrayList;
@@ -88,7 +89,7 @@ public class NegationExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new IntermediateLayer(
+          new SingleInputIntermediateLayer(
               new ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 208a760..c1e6eb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
@@ -79,7 +80,7 @@ public class TimeSeriesOperand extends Expression {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       expressionIntermediateLayerMap.put(
           this,
-          new IntermediateLayer(
+          new SingleInputIntermediateLayer(
               inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index f4ccd03..cbac211 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -19,146 +19,9 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
 
-import java.io.IOException;
+public interface IntermediateLayer {
 
-public class IntermediateLayer {
-
-  private static final int CACHE_BLOCK_SIZE = 1;
-
-  private final TSDataType dataType;
-  private final LayerPointReader parentLayerPointReader;
-  private final ElasticSerializableTVList tvList;
-  private final SafetyLine safetyLine;
-
-  public IntermediateLayer(
-      LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
-      throws QueryProcessException {
-    this.parentLayerPointReader = parentLayerPointReader;
-    dataType = parentLayerPointReader.getDataType();
-    tvList =
-        ElasticSerializableTVList.newElasticSerializableTVList(
-            dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
-    safetyLine = new SafetyLine();
-  }
-
-  public LayerPointReader constructPointReader() {
-
-    return new LayerPointReader() {
-
-      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
-
-      private boolean hasCached = false;
-      private int currentPointIndex = -1;
-
-      @Override
-      public boolean next() throws QueryProcessException, IOException {
-        if (hasCached) {
-          return true;
-        }
-
-        if (currentPointIndex < tvList.size() - 1) {
-          ++currentPointIndex;
-          hasCached = true;
-        }
-
-        // tvList.size() - 1 <= currentPointIndex
-        if (!hasCached && parentLayerPointReader.next()) {
-          cachePoint();
-          parentLayerPointReader.readyForNext();
-
-          ++currentPointIndex;
-          hasCached = true;
-        }
-
-        return hasCached;
-      }
-
-      private void cachePoint() throws IOException, QueryProcessException {
-        switch (dataType) {
-          case INT32:
-            tvList.putInt(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
-            break;
-          case INT64:
-            tvList.putLong(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
-            break;
-          case FLOAT:
-            tvList.putFloat(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
-            break;
-          case DOUBLE:
-            tvList.putDouble(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
-            break;
-          case BOOLEAN:
-            tvList.putBoolean(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
-            break;
-          case TEXT:
-            tvList.putBinary(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
-            break;
-          default:
-            throw new UnsupportedOperationException(dataType.name());
-        }
-      }
-
-      @Override
-      public void readyForNext() {
-        hasCached = false;
-
-        safetyPile.moveForwardTo(currentPointIndex + 1);
-        // todo: reduce the update rate
-        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
-      }
-
-      @Override
-      public TSDataType getDataType() {
-        return dataType;
-      }
-
-      @Override
-      public long currentTime() throws IOException {
-        return tvList.getTime(currentPointIndex);
-      }
-
-      @Override
-      public int currentInt() throws IOException {
-        return tvList.getInt(currentPointIndex);
-      }
-
-      @Override
-      public long currentLong() throws IOException {
-        return tvList.getLong(currentPointIndex);
-      }
-
-      @Override
-      public float currentFloat() throws IOException {
-        return tvList.getFloat(currentPointIndex);
-      }
-
-      @Override
-      public double currentDouble() throws IOException {
-        return tvList.getDouble(currentPointIndex);
-      }
-
-      @Override
-      public boolean currentBoolean() throws IOException {
-        return tvList.getBoolean(currentPointIndex);
-      }
-
-      @Override
-      public Binary currentBinary() throws IOException {
-        return tvList.getBinary(currentPointIndex);
-      }
-    };
-  }
+  LayerPointReader constructPointReader();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java
new file mode 100644
index 0000000..260c308
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+import java.util.List;
+
+public class MultiInputIntermediateLayer implements IntermediateLayer {
+
+  public MultiInputIntermediateLayer(
+      List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+
+  @Override
+  public LayerPointReader constructPointReader() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
similarity index 97%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
index f4ccd03..04697c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 
-public class IntermediateLayer {
+public class SingleInputIntermediateLayer implements IntermediateLayer {
 
   private static final int CACHE_BLOCK_SIZE = 1;
 
@@ -37,7 +37,7 @@ public class IntermediateLayer {
   private final ElasticSerializableTVList tvList;
   private final SafetyLine safetyLine;
 
-  public IntermediateLayer(
+  public SingleInputIntermediateLayer(
       LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
       throws QueryProcessException {
     this.parentLayerPointReader = parentLayerPointReader;
@@ -48,6 +48,7 @@ public class IntermediateLayer {
     safetyLine = new SafetyLine();
   }
 
+  @Override
   public LayerPointReader constructPointReader() {
 
     return new LayerPointReader() {