You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/28 09:47:19 UTC

[iotdb] branch iotdb-1971 updated: finish preprocess workflow

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-1971 by this push:
     new 7a8d55b  finish preprocess workflow
7a8d55b is described below

commit 7a8d55be856f6d81d2aadcba9201af4763f2a4f6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Nov 28 17:46:34 2021 +0800

    finish preprocess workflow
---
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  2 +-
 .../db/query/dataset/udf/UDTFFragmentDataSet.java  |  2 +-
 .../db/query/dataset/udf/UDTFJoinDataSet.java      | 12 +++-
 .../iotdb/db/query/expression/Expression.java      | 27 +++++++-
 .../query/expression/binary/BinaryExpression.java  | 79 +++++++++++-----------
 .../db/query/expression/unary/ConstantOperand.java | 17 ++---
 .../query/expression/unary/FunctionExpression.java | 59 ++++++++--------
 .../query/expression/unary/NegationExpression.java | 58 ++++++++--------
 .../query/expression/unary/TimeSeriesOperand.java  | 35 +++++-----
 .../udf/core/layer/ConstantIntermediateLayer.java  |  5 +-
 .../db/query/udf/core/layer/IntermediateLayer.java | 11 ++-
 .../db/query/udf/core/layer/LayerBuilder.java      | 55 +++++++++++++--
 .../layer/MultiInputColumnIntermediateLayer.java   |  3 +-
 ...InputColumnMultiReferenceIntermediateLayer.java |  3 +-
 ...nputColumnSingleReferenceIntermediateLayer.java |  3 +-
 15 files changed, 226 insertions(+), 145 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 30571f6..31e2d9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -106,7 +106,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
     return layerBuilder != null && layerBuilder.canBeSplitIntoFragments();
   }
 
-  public QueryDataSet executeInFragmentsIfPossible() {
+  public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException {
     return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index 3e5353c..a052bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 
 public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
 
-  protected UDTFFragmentDataSet(LayerPointReader[] transformers)
+  public UDTFFragmentDataSet(LayerPointReader[] transformers)
       throws QueryProcessException, IOException {
     super(transformers);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index b455b60..fb39323 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -20,15 +20,18 @@
 package org.apache.iotdb.db.query.dataset.udf;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 
-// TODO: implements DirectAlignByTimeDataSet
 // TODO: performances joining in pool, packing row records while calculating
-public class UDTFJoinDataSet extends QueryDataSet {
+public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
 
   private final UDTFDataSet[] fragmentDataSets;
 
@@ -109,4 +112,9 @@ public class UDTFJoinDataSet extends QueryDataSet {
 
     return rowRecord;
   }
+
+  @Override
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) {
+    throw new NotImplementedException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 5558b50..27df69b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -64,15 +64,38 @@ public abstract class Expression {
 
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
 
-  public abstract IntermediateLayer constructIntermediateLayer(
+  protected abstract void constructIntermediateLayerInternal(
       long queryId,
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException, IOException;
 
+  public final IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFPlan udtfPlan,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      constructIntermediateLayerInternal(
+          queryId,
+          udtfPlan,
+          rawTimeSeriesInputLayer,
+          expressionIntermediateLayerMap,
+          expressionDataTypeMap,
+          memoryAssigner,
+          fragmentDataSetIndex);
+    }
+    return expressionIntermediateLayerMap.get(this);
+  }
+
   /** Sub-classes should override this method indicating if the expression is a constant operand */
   protected abstract boolean isConstantOperandInternal();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index df55256..176b746 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -137,52 +137,51 @@ public abstract class BinaryExpression extends Expression {
   }
 
   @Override
-  public IntermediateLayer constructIntermediateLayer(
+  protected void constructIntermediateLayerInternal(
       long queryId,
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException, IOException {
-    if (!expressionIntermediateLayerMap.containsKey(this)) {
-      float memoryBudgetInMB = memoryAssigner.assign();
-
-      IntermediateLayer leftParentIntermediateLayer =
-          leftExpression.constructIntermediateLayer(
-              queryId,
-              udtfPlan,
-              rawTimeSeriesInputLayer,
-              expressionIntermediateLayerMap,
-              expressionDataTypeMap,
-              memoryAssigner);
-      IntermediateLayer rightParentIntermediateLayer =
-          rightExpression.constructIntermediateLayer(
-              queryId,
-              udtfPlan,
-              rawTimeSeriesInputLayer,
-              expressionIntermediateLayerMap,
-              expressionDataTypeMap,
-              memoryAssigner);
-      Transformer transformer =
-          constructTransformer(
-              leftParentIntermediateLayer.constructPointReader(),
-              rightParentIntermediateLayer.constructPointReader());
-      expressionDataTypeMap.put(this, transformer.getDataType());
-
-      // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
-      // yet. And since a ConstantLayerPointReader won't produce too much IO,
-      // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
-      expressionIntermediateLayerMap.put(
-          this,
-          memoryAssigner.getReference(this) == 1 || isConstantOperand()
-              ? new SingleInputColumnSingleReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, transformer)
-              : new SingleInputColumnMultiReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, transformer));
-    }
-
-    return expressionIntermediateLayerMap.get(this);
+    float memoryBudgetInMB = memoryAssigner.assign();
+
+    IntermediateLayer leftParentIntermediateLayer =
+        leftExpression.constructIntermediateLayer(
+            queryId,
+            udtfPlan,
+            rawTimeSeriesInputLayer,
+            expressionIntermediateLayerMap,
+            expressionDataTypeMap,
+            memoryAssigner,
+            fragmentDataSetIndex);
+    IntermediateLayer rightParentIntermediateLayer =
+        rightExpression.constructIntermediateLayer(
+            queryId,
+            udtfPlan,
+            rawTimeSeriesInputLayer,
+            expressionIntermediateLayerMap,
+            expressionDataTypeMap,
+            memoryAssigner,
+            fragmentDataSetIndex);
+    Transformer transformer =
+        constructTransformer(
+            leftParentIntermediateLayer.constructPointReader(),
+            rightParentIntermediateLayer.constructPointReader());
+    expressionDataTypeMap.put(this, transformer.getDataType());
+
+    // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
+    // yet. And since a ConstantLayerPointReader won't produce too much IO,
+    // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
+    expressionIntermediateLayerMap.put(
+        this,
+        memoryAssigner.getReference(this) == 1 || isConstantOperand()
+            ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer)
+            : new SingleInputColumnMultiReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer));
   }
 
   protected abstract ArithmeticBinaryTransformer constructTransformer(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index cb8f61d..0a58321 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -86,22 +86,19 @@ public class ConstantOperand extends Expression {
   }
 
   @Override
-  public IntermediateLayer constructIntermediateLayer(
+  protected void constructIntermediateLayerInternal(
       long queryId,
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException {
-    if (!expressionIntermediateLayerMap.containsKey(this)) {
-      expressionDataTypeMap.put(this, this.getDataType());
-      IntermediateLayer intermediateLayer =
-          new ConstantIntermediateLayer(this, queryId, memoryAssigner.assign());
-      expressionIntermediateLayerMap.put(this, intermediateLayer);
-    }
-
-    return expressionIntermediateLayerMap.get(this);
+    expressionDataTypeMap.put(this, this.getDataType());
+    IntermediateLayer intermediateLayer =
+        new ConstantIntermediateLayer(this, queryId, memoryAssigner.assign(), fragmentDataSetIndex);
+    expressionIntermediateLayerMap.put(this, intermediateLayer);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index dbd8211..2217fe6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -200,40 +200,38 @@ public class FunctionExpression extends Expression {
   }
 
   @Override
-  public IntermediateLayer constructIntermediateLayer(
+  protected void constructIntermediateLayerInternal(
       long queryId,
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException, IOException {
-    if (!expressionIntermediateLayerMap.containsKey(this)) {
-      float memoryBudgetInMB = memoryAssigner.assign();
-
-      IntermediateLayer udfInputIntermediateLayer =
-          constructUdfInputIntermediateLayer(
-              queryId,
-              udtfPlan,
-              rawTimeSeriesInputLayer,
-              expressionIntermediateLayerMap,
-              expressionDataTypeMap,
-              memoryAssigner);
-      Transformer transformer =
-          constructUdfTransformer(
-              queryId, udtfPlan, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer);
-      expressionDataTypeMap.put(this, transformer.getDataType());
-
-      expressionIntermediateLayerMap.put(
-          this,
-          memoryAssigner.getReference(this) == 1
-              ? new SingleInputColumnSingleReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, transformer)
-              : new SingleInputColumnMultiReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, transformer));
-    }
+    float memoryBudgetInMB = memoryAssigner.assign();
 
-    return expressionIntermediateLayerMap.get(this);
+    IntermediateLayer udfInputIntermediateLayer =
+        constructUdfInputIntermediateLayer(
+            queryId,
+            udtfPlan,
+            rawTimeSeriesInputLayer,
+            expressionIntermediateLayerMap,
+            expressionDataTypeMap,
+            memoryAssigner,
+            fragmentDataSetIndex);
+    Transformer transformer =
+        constructUdfTransformer(
+            queryId, udtfPlan, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer);
+    expressionDataTypeMap.put(this, transformer.getDataType());
+
+    expressionIntermediateLayerMap.put(
+        this,
+        memoryAssigner.getReference(this) == 1
+            ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer)
+            : new SingleInputColumnMultiReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer));
   }
 
   private IntermediateLayer constructUdfInputIntermediateLayer(
@@ -242,7 +240,8 @@ public class FunctionExpression extends Expression {
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException, IOException {
     List<IntermediateLayer> intermediateLayers = new ArrayList<>();
     for (Expression expression : expressions) {
@@ -253,7 +252,8 @@ public class FunctionExpression extends Expression {
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
-              memoryAssigner));
+              memoryAssigner,
+              fragmentDataSetIndex));
     }
     return intermediateLayers.size() == 1
         ? intermediateLayers.get(0)
@@ -261,6 +261,7 @@ public class FunctionExpression extends Expression {
             this,
             queryId,
             memoryAssigner.assign(),
+            fragmentDataSetIndex,
             intermediateLayers.stream()
                 .map(IntermediateLayer::constructPointReader)
                 .collect(Collectors.toList()));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 2a514ba..39f4bab 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -101,42 +101,40 @@ public class NegationExpression extends Expression {
   }
 
   @Override
-  public IntermediateLayer constructIntermediateLayer(
+  protected void constructIntermediateLayerInternal(
       long queryId,
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException, IOException {
-    if (!expressionIntermediateLayerMap.containsKey(this)) {
-      float memoryBudgetInMB = memoryAssigner.assign();
-
-      IntermediateLayer parentLayerPointReader =
-          expression.constructIntermediateLayer(
-              queryId,
-              udtfPlan,
-              rawTimeSeriesInputLayer,
-              expressionIntermediateLayerMap,
-              expressionDataTypeMap,
-              memoryAssigner);
-      Transformer transformer =
-          new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
-      expressionDataTypeMap.put(this, transformer.getDataType());
-
-      // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
-      // yet. And since a ConstantLayerPointReader won't produce too much IO,
-      // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
-      expressionIntermediateLayerMap.put(
-          this,
-          memoryAssigner.getReference(this) == 1 || isConstantOperand()
-              ? new SingleInputColumnSingleReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, transformer)
-              : new SingleInputColumnMultiReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, transformer));
-    }
-
-    return expressionIntermediateLayerMap.get(this);
+    float memoryBudgetInMB = memoryAssigner.assign();
+
+    IntermediateLayer parentLayerPointReader =
+        expression.constructIntermediateLayer(
+            queryId,
+            udtfPlan,
+            rawTimeSeriesInputLayer,
+            expressionIntermediateLayerMap,
+            expressionDataTypeMap,
+            memoryAssigner,
+            fragmentDataSetIndex);
+    Transformer transformer =
+        new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
+    expressionDataTypeMap.put(this, transformer.getDataType());
+
+    // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
+    // yet. And since a ConstantLayerPointReader won't produce too much IO,
+    // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
+    expressionIntermediateLayerMap.put(
+        this,
+        memoryAssigner.getReference(this) == 1 || isConstantOperand()
+            ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer)
+            : new SingleInputColumnMultiReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 75e893e..9c65d48 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -92,31 +92,28 @@ public class TimeSeriesOperand extends Expression {
   }
 
   @Override
-  public IntermediateLayer constructIntermediateLayer(
+  protected void constructIntermediateLayerInternal(
       long queryId,
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
-      LayerMemoryAssigner memoryAssigner)
+      LayerMemoryAssigner memoryAssigner,
+      int fragmentDataSetIndex)
       throws QueryProcessException {
-    if (!expressionIntermediateLayerMap.containsKey(this)) {
-      float memoryBudgetInMB = memoryAssigner.assign();
-
-      LayerPointReader parentLayerPointReader =
-          rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path));
-      expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
-
-      expressionIntermediateLayerMap.put(
-          this,
-          memoryAssigner.getReference(this) == 1
-              ? new SingleInputColumnSingleReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, parentLayerPointReader)
-              : new SingleInputColumnMultiReferenceIntermediateLayer(
-                  this, queryId, memoryBudgetInMB, parentLayerPointReader));
-    }
-
-    return expressionIntermediateLayerMap.get(this);
+    float memoryBudgetInMB = memoryAssigner.assign();
+
+    LayerPointReader parentLayerPointReader =
+        rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path));
+    expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
+
+    expressionIntermediateLayerMap.put(
+        this,
+        memoryAssigner.getReference(this) == 1
+            ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader)
+            : new SingleInputColumnMultiReferenceIntermediateLayer(
+                this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader));
   }
 
   public String getExpressionStringInternal() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java
index 70f717e..69b9d4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java
@@ -33,9 +33,10 @@ public class ConstantIntermediateLayer extends IntermediateLayer {
 
   private final LayerPointReader constantLayerPointReaderCache;
 
-  public ConstantIntermediateLayer(ConstantOperand expression, long queryId, float memoryBudgetInMB)
+  public ConstantIntermediateLayer(
+      ConstantOperand expression, long queryId, float memoryBudgetInMB, int fragmentDataSetIndex)
       throws QueryProcessException {
-    super(expression, queryId, memoryBudgetInMB);
+    super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
     constantLayerPointReaderCache = new ConstantLayerPointReader(expression);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 890a6b4..7772cb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -40,10 +40,15 @@ public abstract class IntermediateLayer {
   protected final long queryId;
   protected final float memoryBudgetInMB;
 
-  protected IntermediateLayer(Expression expression, long queryId, float memoryBudgetInMB) {
+  // used to mark the dataset fragment it belongs to
+  protected final int fragmentDataSetIndex;
+
+  protected IntermediateLayer(
+      Expression expression, long queryId, float memoryBudgetInMB, int fragmentDataSetIndex) {
     this.expression = expression;
     this.queryId = queryId;
     this.memoryBudgetInMB = memoryBudgetInMB;
+    this.fragmentDataSetIndex = fragmentDataSetIndex;
   }
 
   public abstract LayerPointReader constructPointReader();
@@ -73,6 +78,10 @@ public abstract class IntermediateLayer {
       SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
       throws QueryProcessException, IOException;
 
+  public int getFragmentDataSetIndex() {
+    return fragmentDataSetIndex;
+  }
+
   @Override
   public String toString() {
     return expression.toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 178cfba..b46b146 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.dataset.udf.UDTFDataSet;
+import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet;
+import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
@@ -28,7 +31,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class LayerBuilder {
@@ -51,6 +56,11 @@ public class LayerBuilder {
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
   private final Map<Expression, TSDataType> expressionDataTypeMap;
 
+  // used to split query dataset into fragments.
+  // useless when the dataset can not be split into fragments.
+  private final List<List<LayerPointReader>> fragmentDataSetIndexToLayerPointReaders;
+  private final int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex;
+
   public LayerBuilder(
       long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
     this.queryId = queryId;
@@ -68,6 +78,9 @@ public class LayerBuilder {
 
     expressionIntermediateLayerMap = new HashMap<>();
     expressionDataTypeMap = new HashMap<>();
+
+    fragmentDataSetIndexToLayerPointReaders = new ArrayList<>();
+    resultColumnOutputIndexToFragmentDataSetOutputIndex = new int[resultColumnExpressions.length][];
   }
 
   public LayerBuilder buildLayerMemoryAssigner() {
@@ -79,7 +92,19 @@ public class LayerBuilder {
   }
 
   public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
-    for (int i = 0; i < resultColumnExpressions.length; ++i) {
+    for (int i = 0, n = resultColumnExpressions.length; i < n; ++i) {
+      // resultColumnExpressions[i] -> the index of the fragment it belongs to
+      int fragmentDataSetIndex;
+      IntermediateLayer intermediateLayer =
+          expressionIntermediateLayerMap.get(resultColumnExpressions[i]);
+      if (intermediateLayer != null) {
+        fragmentDataSetIndex = intermediateLayer.getFragmentDataSetIndex();
+      } else {
+        fragmentDataSetIndex = fragmentDataSetIndexToLayerPointReaders.size();
+        fragmentDataSetIndexToLayerPointReaders.add(new ArrayList<>());
+      }
+
+      // build point readers
       resultColumnPointReaders[i] =
           resultColumnExpressions[i]
               .constructIntermediateLayer(
@@ -88,8 +113,18 @@ public class LayerBuilder {
                   rawTimeSeriesInputLayer,
                   expressionIntermediateLayerMap,
                   expressionDataTypeMap,
-                  memoryAssigner)
+                  memoryAssigner,
+                  fragmentDataSetIndex)
               .constructPointReader();
+
+      // collect layer point readers for fragments
+      List<LayerPointReader> layerPointReadersInFragmentDataSet =
+          fragmentDataSetIndexToLayerPointReaders.get(fragmentDataSetIndex);
+      // note that expressions in resultColumnExpressions are all unique
+      // see UDTFPlan#deduplicate() for more detail
+      resultColumnOutputIndexToFragmentDataSetOutputIndex[i] =
+          new int[] {fragmentDataSetIndex, layerPointReadersInFragmentDataSet.size()};
+      layerPointReadersInFragmentDataSet.add(resultColumnPointReaders[i]);
     }
     return this;
   }
@@ -105,11 +140,21 @@ public class LayerBuilder {
     return resultColumnPointReaders;
   }
 
+  /** TODO: make it configurable */
   public boolean canBeSplitIntoFragments() {
-    return false;
+    return 4 <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
-  public QueryDataSet generateJoinDataSet() {
-    return null;
+  public QueryDataSet generateJoinDataSet() throws QueryProcessException, IOException {
+    int n = fragmentDataSetIndexToLayerPointReaders.size();
+    UDTFDataSet[] fragmentDataSets = new UDTFDataSet[n];
+    for (int i = 0; i < n; ++i) {
+      fragmentDataSets[i] =
+          new UDTFFragmentDataSet(
+              fragmentDataSetIndexToLayerPointReaders.get(i).toArray(new LayerPointReader[0]));
+    }
+
+    return new UDTFJoinDataSet(
+        fragmentDataSets, resultColumnOutputIndexToFragmentDataSetOutputIndex);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
index fb36a5b..e671995 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
@@ -51,9 +51,10 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
       Expression expression,
       long queryId,
       float memoryBudgetInMB,
+      int fragmentDataSetIndex,
       List<LayerPointReader> parentLayerPointReaders)
       throws QueryProcessException, IOException {
-    super(expression, queryId, memoryBudgetInMB);
+    super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
 
     layerPointReaders = parentLayerPointReaders.toArray(new LayerPointReader[0]);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
index 03636ed..f712346 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -48,9 +48,10 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       Expression expression,
       long queryId,
       float memoryBudgetInMB,
+      int fragmentDataSetIndex,
       LayerPointReader parentLayerPointReader)
       throws QueryProcessException {
-    super(expression, queryId, memoryBudgetInMB);
+    super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
     this.parentLayerPointReader = parentLayerPointReader;
 
     dataType = parentLayerPointReader.getDataType();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
index be5d41c..124a286 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -44,8 +44,9 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       Expression expression,
       long queryId,
       float memoryBudgetInMB,
+      int fragmentDataSetIndex,
       LayerPointReader parentLayerPointReader) {
-    super(expression, queryId, memoryBudgetInMB);
+    super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
     this.parentLayerPointReader = parentLayerPointReader;
     dataType = parentLayerPointReader.getDataType();
   }