You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/02 04:18:27 UTC

[iotdb] branch iotdb-1971 updated (1da1a6b -> 834dd92)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from 1da1a6b  fragment task dataset & join dataset
     new 4f9d53a  fix bug when spliting
     new 9086826  init QueryDataSet and fix bug
     new afb1d5f  move packBuffer and putPBOSToBuffer into QueryDataSetUtils
     new f678504  pass tests when canBeSplitIntoFragments := 2 <= fragmentDataSetIndexToLayerPointReaders.size()
     new 65e6715  udf_min_fragment_number_to_trigger_parallel_execution
     new 834dd92  udf_min_fragment_number_to_trigger_parallel_execution docs

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../Advanced-Features/UDF-User-Defined-Function.md |   3 +-
 .../Advanced-Features/UDF-User-Defined-Function.md |   3 +-
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  17 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  13 ++
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  48 +-----
 .../iotdb/db/query/dataset/udf/UDTFDataSet.java    |   6 +-
 .../db/query/dataset/udf/UDTFFragmentDataSet.java  |  11 +-
 .../query/dataset/udf/UDTFFragmentDataSetTask.java |  17 +-
 .../db/query/dataset/udf/UDTFJoinDataSet.java      | 182 +++++++++++++++++++--
 .../iotdb/db/query/expression/Expression.java      |   3 +
 .../query/expression/binary/BinaryExpression.java  |  14 ++
 .../db/query/expression/unary/ConstantOperand.java |   7 +
 .../query/expression/unary/FunctionExpression.java |  18 ++
 .../query/expression/unary/NegationExpression.java |   9 +
 .../query/expression/unary/TimeSeriesOperand.java  |   7 +
 .../pool/DataSetFragmentExecutionPoolManager.java  |   8 +-
 .../db/query/udf/core/layer/LayerBuilder.java      |  26 +--
 .../query/udf/core/layer/RawQueryInputLayer.java   |  36 ++--
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  33 ++++
 .../tsfile/read/query/dataset/QueryDataSet.java    |  18 ++
 21 files changed, 384 insertions(+), 100 deletions(-)

[iotdb] 03/06: move packBuffer and putPBOSToBuffer into QueryDataSetUtils

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit afb1d5f33ce9c401e5c08248a6bdf33e9b925125
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Dec 1 18:04:56 2021 +0800

    move packBuffer and putPBOSToBuffer into QueryDataSetUtils
---
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  40 +-----
 .../db/query/dataset/udf/UDTFJoinDataSet.java      | 140 +++++++++++++++++++--
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  33 +++++
 3 files changed, 168 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 7572516..7c7f1bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -40,8 +41,6 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 
 public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignByTimeDataSet {
@@ -109,8 +108,6 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   }
 
   public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException {
-    // TODO make the behaviour of the return value of layerBuilder.generateJoinDataSet() the same as
-    // TODO the original dataset
     return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet(this) : this;
   }
 
@@ -237,39 +234,8 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
       }
     }
 
-    return packBuffer(tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList);
-  }
-
-  protected TSQueryDataSet packBuffer(
-      TSQueryDataSet tsQueryDataSet,
-      PublicBAOS timeBAOS,
-      PublicBAOS[] valueBAOSList,
-      PublicBAOS[] bitmapBAOSList) {
-    int columnsNum = transformers.length;
-
-    ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
-    timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
-    timeBuffer.flip();
-    tsQueryDataSet.setTime(timeBuffer);
-
-    List<ByteBuffer> valueBufferList = new ArrayList<>();
-    List<ByteBuffer> bitmapBufferList = new ArrayList<>();
-    for (int i = 0; i < columnsNum; ++i) {
-      putPBOSToBuffer(valueBAOSList, valueBufferList, i);
-      putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i);
-    }
-    tsQueryDataSet.setValueList(valueBufferList);
-    tsQueryDataSet.setBitmapList(bitmapBufferList);
-
-    return tsQueryDataSet;
-  }
-
-  protected void putPBOSToBuffer(
-      PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int tsIndex) {
-    ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapBAOSList[tsIndex].size());
-    bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, bitmapBAOSList[tsIndex].size());
-    bitmapBuffer.flip();
-    bitmapBufferList.add(bitmapBuffer);
+    return QueryDataSetUtils.packBuffer(
+        tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, transformers.length);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index f92f600..1e7bd02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,15 +19,19 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.IOException;
 
-public class UDTFJoinDataSet extends QueryDataSet
-//    implements DirectAlignByTimeDataSet
-{
+public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
 
   private final UDTFFragmentDataSet[] fragmentDataSets;
 
@@ -77,6 +81,131 @@ public class UDTFJoinDataSet extends QueryDataSet
   }
 
   @Override
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
+      throws IOException, QueryProcessException {
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+    PublicBAOS timeBAOS = new PublicBAOS();
+    PublicBAOS[] valueBAOSList = new PublicBAOS[resultColumnsLength];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[resultColumnsLength];
+    for (int i = 0; i < resultColumnsLength; ++i) {
+      valueBAOSList[i] = new PublicBAOS();
+      bitmapBAOSList[i] = new PublicBAOS();
+    }
+    int[] currentBitmapList = new int[resultColumnsLength];
+
+    //    int rowCount = 0;
+    //    while (rowCount < fetchSize
+    //        && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
+    //        && !timeHeap.isEmpty()) {
+    //
+    //      long minTime = timeHeap.pollFirst();
+    //      if (rowOffset == 0) {
+    //        timeBAOS.write(BytesUtils.longToBytes(minTime));
+    //      }
+    //
+    //      for (int i = 0; i < resultColumnsLength; ++i) {
+    //        LayerPointReader reader = transformers[i];
+    //
+    //        if (!reader.next() || reader.currentTime() != minTime) {
+    //          if (rowOffset == 0) {
+    //            currentBitmapList[i] = (currentBitmapList[i] << 1);
+    //          }
+    //          continue;
+    //        }
+    //
+    //        if (rowOffset == 0) {
+    //          currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
+    //          TSDataType type = reader.getDataType();
+    //          switch (type) {
+    //            case INT32:
+    //              int intValue = reader.currentInt();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeInt(intValue, minTime)
+    //                      : intValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case INT64:
+    //              long longValue = reader.currentLong();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeLong(longValue, minTime)
+    //                      : longValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case FLOAT:
+    //              float floatValue = reader.currentFloat();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeFloat(floatValue, minTime)
+    //                      : floatValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case DOUBLE:
+    //              double doubleValue = reader.currentDouble();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeDouble(doubleValue, minTime)
+    //                      : doubleValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case BOOLEAN:
+    //              ReadWriteIOUtils.write(reader.currentBoolean(), valueBAOSList[i]);
+    //              break;
+    //            case TEXT:
+    //              ReadWriteIOUtils.write(reader.currentBinary(), valueBAOSList[i]);
+    //              break;
+    //            default:
+    //              throw new UnSupportedDataTypeException(
+    //                  String.format("Data type %s is not supported.", type));
+    //          }
+    //        }
+    //
+    //        reader.readyForNext();
+    //
+    //        if (reader.next()) {
+    //          timeHeap.add(reader.currentTime());
+    //        }
+    //      }
+    //
+    //      if (rowOffset == 0) {
+    //        ++rowCount;
+    //        if (rowCount % 8 == 0) {
+    //          for (int i = 0; i < resultColumnsLength; ++i) {
+    //            ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]);
+    //            currentBitmapList[i] = 0;
+    //          }
+    //        }
+    //        if (rowLimit > 0) {
+    //          ++alreadyReturnedRowNum;
+    //        }
+    //      } else {
+    //        --rowOffset;
+    //      }
+    //
+    //      rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
+    //    }
+    //
+    //    /*
+    //     * feed the bitmap with remaining 0 in the right
+    //     * if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000
+    //     */
+    //    if (rowCount > 0) {
+    //      int remaining = rowCount % 8;
+    //      if (remaining != 0) {
+    //        for (int i = 0; i < resultColumnsLength; ++i) {
+    //          ReadWriteIOUtils.write(
+    //              (byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]);
+    //        }
+    //      }
+    //    }
+
+    return QueryDataSetUtils.packBuffer(
+        tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, resultColumnsLength);
+  }
+
+  @Override
   public boolean hasNextWithoutConstraint() throws IOException {
     return !timeHeap.isEmpty();
   }
@@ -114,9 +243,4 @@ public class UDTFJoinDataSet extends QueryDataSet
 
     return rowRecord;
   }
-
-  //  @Override
-  //  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) {
-  //    throw new NotImplementedException();
-  //  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 34c1483..7816169 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -28,11 +28,13 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -269,4 +271,35 @@ public class QueryDataSetUtils {
     }
     return values;
   }
+
+  public static TSQueryDataSet packBuffer(
+      TSQueryDataSet tsQueryDataSet,
+      PublicBAOS timeBAOS,
+      PublicBAOS[] valueBAOSList,
+      PublicBAOS[] bitmapBAOSList,
+      int columnsLength) {
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
+    timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    List<ByteBuffer> valueBufferList = new ArrayList<>();
+    List<ByteBuffer> bitmapBufferList = new ArrayList<>();
+    for (int i = 0; i < columnsLength; ++i) {
+      putPBOSToBuffer(valueBAOSList, valueBufferList, i);
+      putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i);
+    }
+    tsQueryDataSet.setValueList(valueBufferList);
+    tsQueryDataSet.setBitmapList(bitmapBufferList);
+
+    return tsQueryDataSet;
+  }
+
+  public static void putPBOSToBuffer(
+      PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int tsIndex) {
+    ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapBAOSList[tsIndex].size());
+    bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, bitmapBAOSList[tsIndex].size());
+    bitmapBuffer.flip();
+    bitmapBufferList.add(bitmapBuffer);
+  }
 }

[iotdb] 01/06: fix bug when spliting

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4f9d53acd8dbba0e2364ef9377e11fb872a18f58
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Dec 1 16:52:29 2021 +0800

    fix bug when spliting
---
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  6 ++--
 .../iotdb/db/query/dataset/udf/UDTFDataSet.java    |  6 ++--
 .../db/query/dataset/udf/UDTFFragmentDataSet.java  | 11 ++++---
 .../query/dataset/udf/UDTFFragmentDataSetTask.java | 17 +++++-----
 .../db/query/dataset/udf/UDTFJoinDataSet.java      |  3 +-
 .../iotdb/db/query/expression/Expression.java      |  3 ++
 .../query/expression/binary/BinaryExpression.java  | 14 +++++++++
 .../db/query/expression/unary/ConstantOperand.java |  7 +++++
 .../query/expression/unary/FunctionExpression.java | 18 +++++++++++
 .../query/expression/unary/NegationExpression.java |  9 ++++++
 .../query/expression/unary/TimeSeriesOperand.java  |  7 +++++
 .../pool/DataSetFragmentExecutionPoolManager.java  |  8 +++--
 .../db/query/udf/core/layer/LayerBuilder.java      | 10 +++---
 .../query/udf/core/layer/RawQueryInputLayer.java   | 36 ++++++++++++----------
 14 files changed, 111 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index d3bc678..b0b6a3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -80,9 +81,10 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   }
 
   /** for data set fragment */
-  protected UDTFAlignByTimeDataSet(LayerPointReader[] transformers)
+  protected UDTFAlignByTimeDataSet(
+      RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers)
       throws QueryProcessException, IOException {
-    super(transformers);
+    super(rawQueryInputLayer, transformers);
     initTimeHeap();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
index 5700336..3e523b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
@@ -128,12 +128,12 @@ public abstract class UDTFDataSet extends QueryDataSet {
   }
 
   /** for data set fragment */
-  protected UDTFDataSet(LayerPointReader[] transformers) {
-    // The following 3 fields are useless because they are recorded in their parent data set.
+  protected UDTFDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers) {
+    // The following 2 fields are useless.
     queryId = -1;
     udtfPlan = null;
-    rawQueryInputLayer = null;
 
+    this.rawQueryInputLayer = rawQueryInputLayer;
     this.transformers = transformers;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index 50c646a..8d1cbbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.db.query.dataset.udf;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.pool.DataSetFragmentExecutionPoolManager;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +33,7 @@ import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
+public class UDTFFragmentDataSet extends QueryDataSet {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSet.class);
 
@@ -39,6 +41,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
   private static final DataSetFragmentExecutionPoolManager
       DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER = DataSetFragmentExecutionPoolManager.getInstance();
 
+  private final QueryDataSet fragmentDataSet;
   private final BlockingQueue<Object[]> productionBlockingQueue;
 
   private RowRecord[] rowRecords = null;
@@ -47,9 +50,9 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
 
   private boolean hasNextRowRecords = true;
 
-  public UDTFFragmentDataSet(LayerPointReader[] transformers)
+  public UDTFFragmentDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers)
       throws QueryProcessException, IOException {
-    super(transformers);
+    fragmentDataSet = new UDTFAlignByTimeDataSet(rawQueryInputLayer, transformers);
     productionBlockingQueue = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
     submitTask();
   }
@@ -105,7 +108,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
   private void submitTask() {
     if (productionBlockingQueue.remainingCapacity() > 0) {
       DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER.submit(
-          new UDTFFragmentDataSetTask(fetchSize, this, productionBlockingQueue));
+          new UDTFFragmentDataSetTask(fetchSize, fragmentDataSet, productionBlockingQueue));
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
index fb50dea..62452dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.dataset.udf;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,22 +33,20 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable {
   private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSetTask.class);
 
   private final int fetchSize;
-  private final UDTFFragmentDataSet fragmentDataSet;
+  private final QueryDataSet queryDataSet;
 
   // there are 3 elements in Object[].
   // [0]: RowRecord[] or Throwable.
   // [2]: Integer. actual length of produced row records in [0]. note that the element is -1 when
   // the [0] element is a Throwable.
-  // [1]: Boolean. true if the fragmentDataSet still has next RowRecord to be consumed, otherwise
+  // [1]: Boolean. true if the queryDataSet still has next RowRecord to be consumed, otherwise
   // false. note that the element is false when the [0] element is a Throwable.
   private final BlockingQueue<Object[]> productionBlockingQueue;
 
   public UDTFFragmentDataSetTask(
-      int fetchSize,
-      UDTFFragmentDataSet fragmentDataSet,
-      BlockingQueue<Object[]> productionBlockingQueue) {
+      int fetchSize, QueryDataSet queryDataSet, BlockingQueue<Object[]> productionBlockingQueue) {
     this.fetchSize = fetchSize;
-    this.fragmentDataSet = fragmentDataSet;
+    this.queryDataSet = queryDataSet;
     this.productionBlockingQueue = productionBlockingQueue;
   }
 
@@ -56,13 +55,13 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable {
     try {
       int rowRecordCount = 0;
       RowRecord[] rowRecords = new RowRecord[fetchSize];
-      while (rowRecordCount < fetchSize && fragmentDataSet.hasNextWithoutConstraint()) {
-        rowRecords[rowRecordCount++] = fragmentDataSet.nextWithoutConstraint();
+      while (rowRecordCount < fetchSize && queryDataSet.hasNextWithoutConstraint()) {
+        rowRecords[rowRecordCount++] = queryDataSet.nextWithoutConstraint();
       }
 
       // if a task is submitted, there must be free space in the queue
       productionBlockingQueue.put(
-          new Object[] {rowRecords, rowRecordCount, fragmentDataSet.hasNextWithoutConstraint()});
+          new Object[] {rowRecords, rowRecordCount, queryDataSet.hasNextWithoutConstraint()});
     } catch (Throwable e) {
       onThrowable(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index ca2ef3a..323e1c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 
-// TODO: performances joining in pool, packing row records while calculating
 public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
 
   private final UDTFFragmentDataSet[] fragmentDataSets;
@@ -68,7 +67,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
     timeHeap = new TimeSelector(resultColumnsLength << 1, true);
 
     for (int i = 0; i < resultColumnsLength; ++i) {
-      UDTFDataSet fragmentDataSet = fragmentDataSets[i];
+      QueryDataSet fragmentDataSet = fragmentDataSets[i];
       if (fragmentDataSet.hasNextWithoutConstraint()) {
         rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint();
         timeHeap.add(rowRecordsCache[i].getTimestamp());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 27df69b..6284c02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -96,6 +96,9 @@ public abstract class Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  public abstract Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap);
+
   /** Sub-classes should override this method indicating if the expression is a constant operand */
   protected abstract boolean isConstantOperandInternal();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 176b746..9fc05dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -188,6 +188,20 @@ public abstract class BinaryExpression extends Expression {
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
 
   @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    if (intermediateLayer != null) {
+      return intermediateLayer.getFragmentDataSetIndex();
+    }
+
+    Integer index = leftExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+    return index != null
+        ? index
+        : rightExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+  }
+
+  @Override
   public final String getExpressionStringInternal() {
     StringBuilder builder = new StringBuilder();
     if (leftExpression instanceof BinaryExpression) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0a58321..f55c82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -102,6 +102,13 @@ public class ConstantOperand extends Expression {
   }
 
   @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex();
+  }
+
+  @Override
   public String getExpressionStringInternal() {
     return valueString;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 2217fe6..b9e8d7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -293,6 +293,24 @@ public class FunctionExpression extends Expression {
     }
   }
 
+  @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    if (intermediateLayer != null) {
+      return intermediateLayer.getFragmentDataSetIndex();
+    }
+
+    for (Expression expression : expressions) {
+      Integer index = expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+      if (index != null) {
+        return index;
+      }
+    }
+
+    return null;
+  }
+
   public List<PartialPath> getPaths() {
     if (paths == null) {
       paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 39f4bab..c57d8e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -138,6 +138,15 @@ public class NegationExpression extends Expression {
   }
 
   @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    return intermediateLayer != null
+        ? intermediateLayer.getFragmentDataSetIndex()
+        : expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+  }
+
+  @Override
   public String getExpressionStringInternal() {
     return "-" + expression.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 9c65d48..ec2a4de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -116,6 +116,13 @@ public class TimeSeriesOperand extends Expression {
                 this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader));
   }
 
+  @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex();
+  }
+
   public String getExpressionStringInternal() {
     return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
index 0705380..c62e415 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
@@ -35,7 +35,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
   private DataSetFragmentExecutionPoolManager() {
     pool =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+            Math.min(
+                Runtime.getRuntime().availableProcessors(),
+                IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()),
             ThreadName.QUERY_FRAGMENT_SERVICE.getName());
   }
 
@@ -58,7 +60,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
     if (pool == null) {
       pool =
           IoTDBThreadPoolFactory.newFixedThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+              Math.min(
+                  Runtime.getRuntime().availableProcessors(),
+                  IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()),
               ThreadName.QUERY_FRAGMENT_SERVICE.getName());
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 1541829..a7b0582 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -93,12 +93,9 @@ public class LayerBuilder {
   public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
     for (int i = 0, n = resultColumnExpressions.length; i < n; ++i) {
       // resultColumnExpressions[i] -> the index of the fragment it belongs to
-      int fragmentDataSetIndex;
-      IntermediateLayer intermediateLayer =
-          expressionIntermediateLayerMap.get(resultColumnExpressions[i]);
-      if (intermediateLayer != null) {
-        fragmentDataSetIndex = intermediateLayer.getFragmentDataSetIndex();
-      } else {
+      Integer fragmentDataSetIndex =
+          resultColumnExpressions[i].tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+      if (fragmentDataSetIndex == null) {
         fragmentDataSetIndex = fragmentDataSetIndexToLayerPointReaders.size();
         fragmentDataSetIndexToLayerPointReaders.add(new ArrayList<>());
       }
@@ -150,6 +147,7 @@ public class LayerBuilder {
     for (int i = 0; i < n; ++i) {
       fragmentDataSets[i] =
           new UDTFFragmentDataSet(
+              rawTimeSeriesInputLayer,
               fragmentDataSetIndexToLayerPointReaders.get(i).toArray(new LayerPointReader[0]));
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index 55d8aca..5efef6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -87,7 +87,9 @@ public class RawQueryInputLayer {
   }
 
   public void updateRowRecordListEvictionUpperBound() {
-    rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+    synchronized (rowRecordList) {
+      rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+    }
   }
 
   public LayerPointReader constructPointReader(int columnIndex) {
@@ -125,27 +127,29 @@ public class RawQueryInputLayer {
         return true;
       }
 
-      for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
-        Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
-        if (rowRecordCandidate[columnIndex] != null) {
-          hasCachedRowRecord = true;
-          cachedRowRecord = rowRecordCandidate;
-          currentRowIndex = i;
-          break;
-        }
-      }
-
-      if (!hasCachedRowRecord) {
-        while (queryDataSet.hasNextRowInObjects()) {
-          Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
-          rowRecordList.put(rowRecordCandidate);
+      synchronized (rowRecordList) {
+        for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
+          Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
           if (rowRecordCandidate[columnIndex] != null) {
             hasCachedRowRecord = true;
             cachedRowRecord = rowRecordCandidate;
-            currentRowIndex = rowRecordList.size() - 1;
+            currentRowIndex = i;
             break;
           }
         }
+
+        if (!hasCachedRowRecord) {
+          while (queryDataSet.hasNextRowInObjects()) {
+            Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+            rowRecordList.put(rowRecordCandidate);
+            if (rowRecordCandidate[columnIndex] != null) {
+              hasCachedRowRecord = true;
+              cachedRowRecord = rowRecordCandidate;
+              currentRowIndex = rowRecordList.size() - 1;
+              break;
+            }
+          }
+        }
       }
 
       return hasCachedRowRecord;

[iotdb] 02/06: init QueryDataSet and fix bug

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9086826929d0455d34c1aabe82dcb3fe2c513225
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Dec 1 17:35:47 2021 +0800

    init QueryDataSet and fix bug
---
 .../db/query/dataset/udf/UDTFAlignByTimeDataSet.java  |  2 +-
 .../iotdb/db/query/dataset/udf/UDTFJoinDataSet.java   | 19 ++++++++++---------
 .../iotdb/db/query/udf/core/layer/LayerBuilder.java   |  8 ++++++--
 .../iotdb/tsfile/read/query/dataset/QueryDataSet.java | 18 ++++++++++++++++++
 4 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index b0b6a3e..7572516 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -111,7 +111,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException {
     // TODO make the behaviour of the return value of layerBuilder.generateJoinDataSet() the same as
     // TODO the original dataset
-    return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
+    return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet(this) : this;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index 323e1c2..f92f600 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,17 +19,15 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
-import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
-import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 
-public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
+public class UDTFJoinDataSet extends QueryDataSet
+//    implements DirectAlignByTimeDataSet
+{
 
   private final UDTFFragmentDataSet[] fragmentDataSets;
 
@@ -51,9 +49,12 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
   private TimeSelector timeHeap;
 
   public UDTFJoinDataSet(
+      UDTFAlignByTimeDataSet udtfAlignByTimeDataSet,
       UDTFFragmentDataSet[] fragmentDataSets,
       int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
       throws IOException {
+    super(udtfAlignByTimeDataSet);
+
     this.fragmentDataSets = fragmentDataSets;
     this.resultColumnOutputIndexToFragmentDataSetOutputIndex =
         resultColumnOutputIndexToFragmentDataSetOutputIndex;
@@ -114,8 +115,8 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
     return rowRecord;
   }
 
-  @Override
-  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) {
-    throw new NotImplementedException();
-  }
+  //  @Override
+  //  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) {
+  //    throw new NotImplementedException();
+  //  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index a7b0582..ab60387 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet;
 import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -141,7 +142,8 @@ public class LayerBuilder {
     return 4 <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
-  public QueryDataSet generateJoinDataSet() throws QueryProcessException, IOException {
+  public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)
+      throws QueryProcessException, IOException {
     int n = fragmentDataSetIndexToLayerPointReaders.size();
     UDTFFragmentDataSet[] fragmentDataSets = new UDTFFragmentDataSet[n];
     for (int i = 0; i < n; ++i) {
@@ -152,6 +154,8 @@ public class LayerBuilder {
     }
 
     return new UDTFJoinDataSet(
-        fragmentDataSets, resultColumnOutputIndexToFragmentDataSetOutputIndex);
+        udtfAlignByTimeDataSet,
+        fragmentDataSets,
+        resultColumnOutputIndexToFragmentDataSetOutputIndex);
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index e1e2f4f..c418e51 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -50,6 +50,24 @@ public abstract class QueryDataSet {
 
   protected int columnNum;
 
+  public QueryDataSet(QueryDataSet that) {
+    this.paths = that.paths;
+    this.dataTypes = that.dataTypes;
+
+    this.rowLimit = that.rowLimit;
+    this.rowOffset = that.rowOffset;
+    this.alreadyReturnedRowNum = that.alreadyReturnedRowNum;
+    this.fetchSize = that.fetchSize;
+    this.ascending = that.ascending;
+
+    this.endPoint = that.endPoint;
+
+    this.withoutAnyNull = that.withoutAnyNull;
+    this.withoutAllNull = that.withoutAllNull;
+
+    this.columnNum = that.columnNum;
+  }
+
   /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */
   public static class EndPoint {
     private String ip = null;

[iotdb] 06/06: udf_min_fragment_number_to_trigger_parallel_execution docs

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 834dd92e22ae30a21b0e89d70c76e6935f346223
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Dec 2 12:16:25 2021 +0800

    udf_min_fragment_number_to_trigger_parallel_execution docs
---
 docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md    | 3 ++-
 docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md b/docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
index 21bd6cd..72db91b 100644
--- a/docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
+++ b/docs/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
@@ -510,7 +510,8 @@ For more user permissions related content, please refer to [Account Management S
 
 ## Configurable Properties
 
-When querying by a UDF, IoTDB may prompt that there is insufficient memory. You can resolve the issue by configuring `udf_initial_byte_array_length_for_memory_control`, `udf_memory_budget_in_mb` and `udf_reader_transformer_collector_memory_proportion` in `iotdb-engine.properties` and restarting the server.
+* When querying by a UDF, IoTDB may prompt that there is insufficient memory. You can resolve the issue by configuring `udf_initial_byte_array_length_for_memory_control`, `udf_memory_budget_in_mb` and `udf_reader_transformer_collector_memory_proportion` in `iotdb-engine.properties` and restarting the server.
+* UDTFPlan can be split into several fragment plans, when the number of the fragment plans is over `udf_min_fragment_number_to_trigger_parallel_execution`, the executor would trigger a parallel execution. The property should be an integer and larger than 1. The default value is 2.
 
 
 
diff --git a/docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md b/docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
index 1ea763b..d1a2811 100644
--- a/docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
+++ b/docs/zh/UserGuide/Advanced-Features/UDF-User-Defined-Function.md
@@ -453,7 +453,8 @@ SHOW FUNCTIONS
 
 ## 配置项
 
-在 SQL 语句中使用自定义函数时,可能提示内存不足。这种情况下,您可以通过更改配置文件`iotdb-engine.properties`中的`udf_initial_byte_array_length_for_memory_control`,`udf_memory_budget_in_mb`和`udf_reader_transformer_collector_memory_proportion`并重启服务来解决此问题。
+* 在 SQL 语句中使用自定义函数时,可能提示内存不足。这种情况下,您可以通过更改配置文件`iotdb-engine.properties`中的`udf_initial_byte_array_length_for_memory_control`,`udf_memory_budget_in_mb`和`udf_reader_transformer_collector_memory_proportion`并重启服务来解决此问题。
+* UDTFPlan 可以拆分成多个分片计划,当分片计划的数量超过`udf_min_fragment_number_to_trigger_parallel_execution`时,执行器会触发分片计划的并行执行。 该属性应该是一个大于 1 的整数,默认值为 2。
 
 ## 贡献 UDF
 

[iotdb] 04/06: pass tests when canBeSplitIntoFragments := 2 <= fragmentDataSetIndexToLayerPointReaders.size()

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f678504eaea088f3827a5e593ff7e04e54a13c2e
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Dec 2 11:17:53 2021 +0800

    pass tests when canBeSplitIntoFragments := 2 <= fragmentDataSetIndexToLayerPointReaders.size()
---
 .../db/query/dataset/udf/UDTFJoinDataSet.java      | 262 ++++++++++++---------
 .../db/query/udf/core/layer/LayerBuilder.java      |   2 +-
 2 files changed, 147 insertions(+), 117 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index 1e7bd02..b838a0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,15 +19,19 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 
@@ -71,7 +75,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
   private void initTimeHeap() throws IOException {
     timeHeap = new TimeSelector(resultColumnsLength << 1, true);
 
-    for (int i = 0; i < resultColumnsLength; ++i) {
+    for (int i = 0, n = fragmentDataSets.length; i < n; ++i) {
       QueryDataSet fragmentDataSet = fragmentDataSets[i];
       if (fragmentDataSet.hasNextWithoutConstraint()) {
         rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint();
@@ -81,8 +85,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
   }
 
   @Override
-  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
-      throws IOException, QueryProcessException {
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException {
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
 
     PublicBAOS timeBAOS = new PublicBAOS();
@@ -94,112 +97,123 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
     }
     int[] currentBitmapList = new int[resultColumnsLength];
 
-    //    int rowCount = 0;
-    //    while (rowCount < fetchSize
-    //        && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
-    //        && !timeHeap.isEmpty()) {
-    //
-    //      long minTime = timeHeap.pollFirst();
-    //      if (rowOffset == 0) {
-    //        timeBAOS.write(BytesUtils.longToBytes(minTime));
-    //      }
-    //
-    //      for (int i = 0; i < resultColumnsLength; ++i) {
-    //        LayerPointReader reader = transformers[i];
-    //
-    //        if (!reader.next() || reader.currentTime() != minTime) {
-    //          if (rowOffset == 0) {
-    //            currentBitmapList[i] = (currentBitmapList[i] << 1);
-    //          }
-    //          continue;
-    //        }
-    //
-    //        if (rowOffset == 0) {
-    //          currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
-    //          TSDataType type = reader.getDataType();
-    //          switch (type) {
-    //            case INT32:
-    //              int intValue = reader.currentInt();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeInt(intValue, minTime)
-    //                      : intValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case INT64:
-    //              long longValue = reader.currentLong();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeLong(longValue, minTime)
-    //                      : longValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case FLOAT:
-    //              float floatValue = reader.currentFloat();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeFloat(floatValue, minTime)
-    //                      : floatValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case DOUBLE:
-    //              double doubleValue = reader.currentDouble();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeDouble(doubleValue, minTime)
-    //                      : doubleValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case BOOLEAN:
-    //              ReadWriteIOUtils.write(reader.currentBoolean(), valueBAOSList[i]);
-    //              break;
-    //            case TEXT:
-    //              ReadWriteIOUtils.write(reader.currentBinary(), valueBAOSList[i]);
-    //              break;
-    //            default:
-    //              throw new UnSupportedDataTypeException(
-    //                  String.format("Data type %s is not supported.", type));
-    //          }
-    //        }
-    //
-    //        reader.readyForNext();
-    //
-    //        if (reader.next()) {
-    //          timeHeap.add(reader.currentTime());
-    //        }
-    //      }
-    //
-    //      if (rowOffset == 0) {
-    //        ++rowCount;
-    //        if (rowCount % 8 == 0) {
-    //          for (int i = 0; i < resultColumnsLength; ++i) {
-    //            ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]);
-    //            currentBitmapList[i] = 0;
-    //          }
-    //        }
-    //        if (rowLimit > 0) {
-    //          ++alreadyReturnedRowNum;
-    //        }
-    //      } else {
-    //        --rowOffset;
-    //      }
-    //
-    //      rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
-    //    }
-    //
-    //    /*
-    //     * feed the bitmap with remaining 0 in the right
-    //     * if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000
-    //     */
-    //    if (rowCount > 0) {
-    //      int remaining = rowCount % 8;
-    //      if (remaining != 0) {
-    //        for (int i = 0; i < resultColumnsLength; ++i) {
-    //          ReadWriteIOUtils.write(
-    //              (byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]);
-    //        }
-    //      }
-    //    }
+    int rowCount = 0;
+    while (rowCount < fetchSize
+        && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
+        && !timeHeap.isEmpty()) {
+
+      long minTime = timeHeap.pollFirst();
+      if (rowOffset == 0) {
+        timeBAOS.write(BytesUtils.longToBytes(minTime));
+      }
+
+      for (int i = 0; i < resultColumnsLength; ++i) {
+        int[] indexes = resultColumnOutputIndexToFragmentDataSetOutputIndex[i];
+        int fragmentDataSetIndex = indexes[0];
+        int outputColumnIndexInFragmentDataSet = indexes[1];
+
+        if (rowRecordsCache[fragmentDataSetIndex] == null) {
+          if (rowOffset == 0) {
+            currentBitmapList[i] = (currentBitmapList[i] << 1);
+          }
+          continue;
+        }
+
+        RowRecord fragmentRowRecord = rowRecordsCache[fragmentDataSetIndex];
+        if (fragmentRowRecord.getTimestamp() != minTime) {
+          if (rowOffset == 0) {
+            currentBitmapList[i] = (currentBitmapList[i] << 1);
+          }
+          continue;
+        }
+
+        if (rowOffset == 0) {
+          currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
+
+          Field field = fragmentRowRecord.getFields().get(outputColumnIndexInFragmentDataSet);
+          if (field == null || field.getDataType() == null) {
+            currentBitmapList[i] = (currentBitmapList[i] << 1);
+            continue;
+          }
+
+          TSDataType type = field.getDataType();
+          switch (type) {
+            case INT32:
+              int intValue = field.getIntV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeInt(intValue, minTime)
+                      : intValue,
+                  valueBAOSList[i]);
+              break;
+            case INT64:
+              long longValue = field.getLongV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeLong(longValue, minTime)
+                      : longValue,
+                  valueBAOSList[i]);
+              break;
+            case FLOAT:
+              float floatValue = field.getFloatV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeFloat(floatValue, minTime)
+                      : floatValue,
+                  valueBAOSList[i]);
+              break;
+            case DOUBLE:
+              double doubleValue = field.getDoubleV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeDouble(doubleValue, minTime)
+                      : doubleValue,
+                  valueBAOSList[i]);
+              break;
+            case BOOLEAN:
+              ReadWriteIOUtils.write(field.getBoolV(), valueBAOSList[i]);
+              break;
+            case TEXT:
+              ReadWriteIOUtils.write(field.getBinaryV(), valueBAOSList[i]);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", type));
+          }
+        }
+      }
+
+      updateRowRecordsCache(minTime);
+
+      if (rowOffset == 0) {
+        ++rowCount;
+        if (rowCount % 8 == 0) {
+          for (int i = 0; i < resultColumnsLength; ++i) {
+            ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]);
+            currentBitmapList[i] = 0;
+          }
+        }
+        if (rowLimit > 0) {
+          ++alreadyReturnedRowNum;
+        }
+      } else {
+        --rowOffset;
+      }
+    }
+
+    /*
+     * feed the bitmap with remaining 0 in the right
+     * if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000
+     */
+    if (rowCount > 0) {
+      int remaining = rowCount % 8;
+      if (remaining != 0) {
+        for (int i = 0; i < resultColumnsLength; ++i) {
+          ReadWriteIOUtils.write(
+              (byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]);
+        }
+      }
+    }
 
     return QueryDataSetUtils.packBuffer(
         tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, resultColumnsLength);
@@ -232,15 +246,31 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
       }
 
       rowRecord.addField(fragmentRowRecord.getFields().get(outputColumnIndexInFragmentDataSet));
-      rowRecordsCache[fragmentDataSetIndex] = null;
+    }
+
+    updateRowRecordsCache(minTime);
+
+    return rowRecord;
+  }
+
+  private void updateRowRecordsCache(long minTime) {
+    for (int i = 0, n = fragmentDataSets.length; i < n; ++i) {
+      if (rowRecordsCache[i] == null) {
+        continue;
+      }
+
+      RowRecord fragmentRowRecord = rowRecordsCache[i];
+      if (fragmentRowRecord.getTimestamp() != minTime) {
+        continue;
+      }
+
+      rowRecordsCache[i] = null;
 
-      if (fragmentDataSets[fragmentDataSetIndex].hasNextWithoutConstraint()) {
-        fragmentRowRecord = fragmentDataSets[fragmentDataSetIndex].nextWithoutConstraint();
-        rowRecordsCache[fragmentDataSetIndex] = fragmentRowRecord;
+      if (fragmentDataSets[i].hasNextWithoutConstraint()) {
+        fragmentRowRecord = fragmentDataSets[i].nextWithoutConstraint();
+        rowRecordsCache[i] = fragmentRowRecord;
         timeHeap.add(fragmentRowRecord.getTimestamp());
       }
     }
-
-    return rowRecord;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index ab60387..a61d872 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -139,7 +139,7 @@ public class LayerBuilder {
 
   /** TODO: make it configurable */
   public boolean canBeSplitIntoFragments() {
-    return 4 <= fragmentDataSetIndexToLayerPointReaders.size();
+    return 2 <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
   public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)

[iotdb] 05/06: udf_min_fragment_number_to_trigger_parallel_execution

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 65e6715bbfedfd4ab492c50181d65ae6f471695a
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Dec 2 12:10:21 2021 +0800

    udf_min_fragment_number_to_trigger_parallel_execution
---
 .../src/assembly/resources/conf/iotdb-engine.properties |  5 +++++
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 17 +++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 13 +++++++++++++
 .../iotdb/db/query/udf/core/layer/LayerBuilder.java     |  8 ++++++--
 4 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index aad2f9d..c736bd1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -828,6 +828,11 @@ timestamp_precision=ms
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # udf_root_dir=ext/udf
 
+# UDTFPlan can be split into several fragment plans, when the number of the fragment plans is over
+# udf_min_fragment_number_to_trigger_parallel_execution, the executor would trigger a parallel
+# execution. The property should be an integer and larger than 1.
+# udf_min_fragment_number_to_trigger_parallel_execution=2
+
 ####################
 ### Trigger Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0ef9fbe..6038fb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -740,6 +740,13 @@ public class IoTDBConfig {
 
   private float udfCollectorMemoryBudgetInMB = (float) (1.0 / 3 * udfMemoryBudgetInMB);
 
+  /**
+   * UDTFPlan can be split into several fragment plans, when the number of the fragment plans is
+   * over udfMinFragmentNumberToTriggerParallelExecution, the executor would trigger a parallel
+   * execution.
+   */
+  private int udfMinFragmentNumberToTriggerParallelExecution = 2;
+
   // time in nanosecond precision when starting up
   private long startUpNanosecond = System.nanoTime();
 
@@ -815,6 +822,16 @@ public class IoTDBConfig {
     this.udfInitialByteArrayLengthForMemoryControl = udfInitialByteArrayLengthForMemoryControl;
   }
 
+  public int getUdfMinFragmentNumberToTriggerParallelExecution() {
+    return udfMinFragmentNumberToTriggerParallelExecution;
+  }
+
+  public void setUdfMinFragmentNumberToTriggerParallelExecution(
+      int udfMinFragmentNumberToTriggerParallelExecution) {
+    this.udfMinFragmentNumberToTriggerParallelExecution =
+        udfMinFragmentNumberToTriggerParallelExecution;
+  }
+
   public int getConcurrentWritingTimePartition() {
     return concurrentWritingTimePartition;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 84f8051..a42572f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1200,6 +1200,13 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "select_into_insert_tablet_plan_row_limit",
                   String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+
+      // udf
+      conf.setUdfMinFragmentNumberToTriggerParallelExecution(
+          Integer.parseInt(
+              properties.getProperty(
+                  "udf_min_fragment_number_to_trigger_parallel_execution",
+                  String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution()))));
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
@@ -1328,6 +1335,12 @@ public class IoTDBDescriptor {
                 + readerTransformerCollectorMemoryProportion);
       }
     }
+
+    conf.setUdfMinFragmentNumberToTriggerParallelExecution(
+        Integer.parseInt(
+            properties.getProperty(
+                "udf_min_fragment_number_to_trigger_parallel_execution",
+                String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution()))));
   }
 
   private void loadTriggerProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index a61d872..05f1b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet;
@@ -38,6 +40,8 @@ import java.util.Map;
 
 public class LayerBuilder {
 
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
   private final long queryId;
   private final UDTFPlan udtfPlan;
   private final RawQueryInputLayer rawTimeSeriesInputLayer;
@@ -137,9 +141,9 @@ public class LayerBuilder {
     return resultColumnPointReaders;
   }
 
-  /** TODO: make it configurable */
   public boolean canBeSplitIntoFragments() {
-    return 2 <= fragmentDataSetIndexToLayerPointReaders.size();
+    return Math.min(2, CONFIG.getUdfMinFragmentNumberToTriggerParallelExecution())
+        <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
   public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)