You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/30 11:06:42 UTC

[iotdb] 03/03: fragment task dataset & join dataset

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1da1a6bad3f46f6f1caf34f37ff6c48b1c9ee68a
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Nov 30 19:05:42 2021 +0800

    fragment task dataset & join dataset
---
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  2 +
 .../db/query/dataset/udf/UDTFFragmentDataSet.java  | 86 ++++++++++++++++++++++
 .../query/dataset/udf/UDTFFragmentDataSetTask.java | 85 +++++++++++++++++++++
 .../db/query/dataset/udf/UDTFJoinDataSet.java      |  5 +-
 .../db/query/udf/core/layer/LayerBuilder.java      |  3 +-
 5 files changed, 177 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 31e2d9b..d3bc678 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -107,6 +107,8 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   }
 
   public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException {
+    // TODO make the behaviour of the return value of layerBuilder.generateJoinDataSet() the same as
+    // TODO the original dataset
     return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index a052bda..50c646a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -20,14 +20,100 @@
 package org.apache.iotdb.db.query.dataset.udf;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.pool.DataSetFragmentExecutionPoolManager;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSet.class);
+
+  private static final int BLOCKING_QUEUE_CAPACITY = 2;
+  private static final DataSetFragmentExecutionPoolManager
+      DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER = DataSetFragmentExecutionPoolManager.getInstance();
+
+  private final BlockingQueue<Object[]> productionBlockingQueue;
+
+  private RowRecord[] rowRecords = null;
+  private int rowRecordsLength = 0;
+  private int rowRecordsIndexConsumed = -1;
+
+  private boolean hasNextRowRecords = true;
+
   public UDTFFragmentDataSet(LayerPointReader[] transformers)
       throws QueryProcessException, IOException {
     super(transformers);
+    productionBlockingQueue = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+    submitTask();
+  }
+
+  @Override
+  public boolean hasNextWithoutConstraint() {
+    try {
+      return rowRecords != null || tryToTakeNextRowRecords();
+    } catch (InterruptedException e) {
+      return onThrowable(e);
+    }
+  }
+
+  private boolean tryToTakeNextRowRecords() throws InterruptedException {
+    if (!hasNextRowRecords) {
+      return false;
+    }
+
+    Object[] production = productionBlockingQueue.take();
+
+    Object rowRecordArrayOrThrowable = production[0];
+    if (rowRecordArrayOrThrowable instanceof Throwable) {
+      return onThrowable((Throwable) rowRecordArrayOrThrowable);
+    }
+
+    rowRecords = (RowRecord[]) production[0];
+    rowRecordsLength = (int) production[1];
+    rowRecordsIndexConsumed = -1;
+    hasNextRowRecords = (boolean) production[2];
+
+    if (rowRecordsLength == 0) {
+      rowRecords = null;
+      // assert !hasNextRowRecords;
+      return false;
+    }
+
+    if (hasNextRowRecords) {
+      submitTask();
+    }
+
+    return true;
+  }
+
+  @Override
+  public RowRecord nextWithoutConstraint() {
+    RowRecord rowRecord = rowRecords[++rowRecordsIndexConsumed];
+    if (rowRecordsIndexConsumed == rowRecordsLength - 1) {
+      rowRecords = null;
+    }
+    return rowRecord;
+  }
+
+  private void submitTask() {
+    if (productionBlockingQueue.remainingCapacity() > 0) {
+      DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER.submit(
+          new UDTFFragmentDataSetTask(fetchSize, this, productionBlockingQueue));
+    }
+  }
+
+  private boolean onThrowable(Throwable throwable) {
+    LOGGER.warn("Error occurred while pulling data in fragment dataset: ", throwable);
+    if (throwable instanceof InterruptedException) {
+      Thread.currentThread().interrupt();
+    }
+    throw new RuntimeException(throwable);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
new file mode 100644
index 0000000..fb50dea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.udf;
+
+import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+public class UDTFFragmentDataSetTask extends WrappedRunnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSetTask.class);
+
+  private final int fetchSize;
+  private final UDTFFragmentDataSet fragmentDataSet;
+
+  // there are 3 elements in Object[].
+  // [0]: RowRecord[] or Throwable.
+  // [2]: Integer. actual length of produced row records in [0]. note that the element is -1 when
+  // the [0] element is a Throwable.
+  // [1]: Boolean. true if the fragmentDataSet still has next RowRecord to be consumed, otherwise
+  // false. note that the element is false when the [0] element is a Throwable.
+  private final BlockingQueue<Object[]> productionBlockingQueue;
+
+  public UDTFFragmentDataSetTask(
+      int fetchSize,
+      UDTFFragmentDataSet fragmentDataSet,
+      BlockingQueue<Object[]> productionBlockingQueue) {
+    this.fetchSize = fetchSize;
+    this.fragmentDataSet = fragmentDataSet;
+    this.productionBlockingQueue = productionBlockingQueue;
+  }
+
+  @Override
+  public void runMayThrow() {
+    try {
+      int rowRecordCount = 0;
+      RowRecord[] rowRecords = new RowRecord[fetchSize];
+      while (rowRecordCount < fetchSize && fragmentDataSet.hasNextWithoutConstraint()) {
+        rowRecords[rowRecordCount++] = fragmentDataSet.nextWithoutConstraint();
+      }
+
+      // if a task is submitted, there must be free space in the queue
+      productionBlockingQueue.put(
+          new Object[] {rowRecords, rowRecordCount, fragmentDataSet.hasNextWithoutConstraint()});
+    } catch (Throwable e) {
+      onThrowable(e);
+    }
+  }
+
+  private void onThrowable(Throwable throwable) {
+    LOGGER.error("Error occurred while querying in fragment: ", throwable);
+
+    if (throwable instanceof InterruptedException) {
+      Thread.currentThread().interrupt();
+    }
+
+    try {
+      productionBlockingQueue.put(new Object[] {throwable, -1, false});
+    } catch (InterruptedException e) {
+      LOGGER.error("Interrupted while putting Throwable into the blocking queue: ", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index b9ac75f..ca2ef3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -32,7 +32,7 @@ import java.io.IOException;
 // TODO: performances joining in pool, packing row records while calculating
 public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
 
-  private final UDTFDataSet[] fragmentDataSets;
+  private final UDTFFragmentDataSet[] fragmentDataSets;
 
   /**
    * Each output column of the UDTFJoinDataSet corresponds to a two-tuple ({@code int[]}) instance
@@ -52,7 +52,8 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
   private TimeSelector timeHeap;
 
   public UDTFJoinDataSet(
-      UDTFDataSet[] fragmentDataSets, int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
+      UDTFFragmentDataSet[] fragmentDataSets,
+      int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
       throws IOException {
     this.fragmentDataSets = fragmentDataSets;
     this.resultColumnOutputIndexToFragmentDataSetOutputIndex =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index b46b146..1541829 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.query.dataset.udf.UDTFDataSet;
 import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet;
 import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -147,7 +146,7 @@ public class LayerBuilder {
 
   public QueryDataSet generateJoinDataSet() throws QueryProcessException, IOException {
     int n = fragmentDataSetIndexToLayerPointReaders.size();
-    UDTFDataSet[] fragmentDataSets = new UDTFDataSet[n];
+    UDTFFragmentDataSet[] fragmentDataSets = new UDTFFragmentDataSet[n];
     for (int i = 0; i < n; ++i) {
       fragmentDataSets[i] =
           new UDTFFragmentDataSet(