You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/27 15:46:32 UTC

[iotdb] 05/05: main flow

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 49e950c451ecf8686b21fda8cc435bddee7e0144
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Nov 27 23:43:39 2021 +0800

    main flow
---
 .../db/query/dataset/udf/UDTFAlignByTimeDataSet.java  | 14 ++++++++++++++
 .../iotdb/db/query/dataset/udf/UDTFDataSet.java       | 19 +++++++++----------
 .../iotdb/db/query/dataset/udf/UDTFJoinDataSet.java   |  1 +
 .../iotdb/db/query/executor/UDTFQueryExecutor.java    |  6 ++++--
 .../core/layer/{DAGBuilder.java => LayerBuilder.java} | 19 ++++++++++++++-----
 5 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 6484870..30571f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -86,6 +87,11 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   }
 
   protected void initTimeHeap() throws IOException, QueryProcessException {
+    // if the dataset can be split, we use UDTFJoinDataSet to pack the results.
+    if (canBeSplitIntoFragments()) {
+      return;
+    }
+
     timeHeap = new TimeSelector(transformers.length << 1, true);
     for (LayerPointReader reader : transformers) {
       // Since a constant operand is not allowed to be a result column, the reader will not be
@@ -96,6 +102,14 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
     }
   }
 
+  public boolean canBeSplitIntoFragments() {
+    return layerBuilder != null && layerBuilder.canBeSplitIntoFragments();
+  }
+
+  public QueryDataSet executeInFragmentsIfPossible() {
+    return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
+  }
+
   @Override
   public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
       throws IOException, QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
index 8f791df..5700336 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.LayerBuilder;
 import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
@@ -53,6 +53,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
   protected final UDTFPlan udtfPlan;
   protected final RawQueryInputLayer rawQueryInputLayer;
 
+  protected LayerBuilder layerBuilder;
   protected LayerPointReader[] transformers;
 
   /** with value filters */
@@ -109,12 +110,14 @@ public abstract class UDTFDataSet extends QueryDataSet {
     UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
     try {
       // UDF executors will be initialized at the same time
+      layerBuilder =
+          new LayerBuilder(
+              queryId,
+              udtfPlan,
+              rawQueryInputLayer,
+              UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
       transformers =
-          new DAGBuilder(
-                  queryId,
-                  udtfPlan,
-                  rawQueryInputLayer,
-                  UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+          layerBuilder
               .buildLayerMemoryAssigner()
               .buildResultColumnPointReaders()
               .setDataSetResultColumnDataTypes()
@@ -137,8 +140,4 @@ public abstract class UDTFDataSet extends QueryDataSet {
   public void finalizeUDFs(long queryId) {
     udtfPlan.finalizeUDFExecutors(queryId);
   }
-
-  public UDTFPlan getUdtfPlan() {
-    return udtfPlan;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index 9c33b04..b455b60 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import java.io.IOException;
 
 // TODO: implements DirectAlignByTimeDataSet
+// TODO: performances joining in pool, packing row records while calculating
 public class UDTFJoinDataSet extends QueryDataSet {
 
   private final UDTFDataSet[] fragmentDataSets;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
index 1ff3848..53b51db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
@@ -48,7 +48,8 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
   public QueryDataSet executeWithoutValueFilterAlignByTime(QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException, InterruptedException {
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
-    return new UDTFAlignByTimeDataSet(context, udtfPlan, readersOfSelectedSeries);
+    return new UDTFAlignByTimeDataSet(context, udtfPlan, readersOfSelectedSeries)
+        .executeInFragmentsIfPossible();
   }
 
   public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context)
@@ -62,7 +63,8 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
     List<IReaderByTimestamp> readersOfSelectedSeries =
         initSeriesReaderByTimestamp(context, udtfPlan, cached);
     return new UDTFAlignByTimeDataSet(
-        context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+            context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached)
+        .executeInFragmentsIfPossible();
   }
 
   public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 1847534..178cfba 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -25,12 +25,13 @@ import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-public class DAGBuilder {
+public class LayerBuilder {
 
   private final long queryId;
   private final UDTFPlan udtfPlan;
@@ -50,7 +51,7 @@ public class DAGBuilder {
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
   private final Map<Expression, TSDataType> expressionDataTypeMap;
 
-  public DAGBuilder(
+  public LayerBuilder(
       long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
     this.queryId = queryId;
     this.udtfPlan = udtfPlan;
@@ -69,7 +70,7 @@ public class DAGBuilder {
     expressionDataTypeMap = new HashMap<>();
   }
 
-  public DAGBuilder buildLayerMemoryAssigner() {
+  public LayerBuilder buildLayerMemoryAssigner() {
     for (Expression expression : resultColumnExpressions) {
       expression.updateStatisticsForMemoryAssigner(memoryAssigner);
     }
@@ -77,7 +78,7 @@ public class DAGBuilder {
     return this;
   }
 
-  public DAGBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
+  public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
     for (int i = 0; i < resultColumnExpressions.length; ++i) {
       resultColumnPointReaders[i] =
           resultColumnExpressions[i]
@@ -93,7 +94,7 @@ public class DAGBuilder {
     return this;
   }
 
-  public DAGBuilder setDataSetResultColumnDataTypes() {
+  public LayerBuilder setDataSetResultColumnDataTypes() {
     for (ResultColumn resultColumn : udtfPlan.getResultColumns()) {
       resultColumn.setDataType(expressionDataTypeMap.get(resultColumn.getExpression()));
     }
@@ -103,4 +104,12 @@ public class DAGBuilder {
   public LayerPointReader[] getResultColumnPointReaders() {
     return resultColumnPointReaders;
   }
+
+  public boolean canBeSplitIntoFragments() {
+    return false;
+  }
+
+  public QueryDataSet generateJoinDataSet() {
+    return null;
+  }
 }