You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/30 11:06:42 UTC
[iotdb] 03/03: fragment task dataset & join dataset
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1da1a6bad3f46f6f1caf34f37ff6c48b1c9ee68a
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Nov 30 19:05:42 2021 +0800
fragment task dataset & join dataset
---
.../query/dataset/udf/UDTFAlignByTimeDataSet.java | 2 +
.../db/query/dataset/udf/UDTFFragmentDataSet.java | 86 ++++++++++++++++++++++
.../query/dataset/udf/UDTFFragmentDataSetTask.java | 85 +++++++++++++++++++++
.../db/query/dataset/udf/UDTFJoinDataSet.java | 5 +-
.../db/query/udf/core/layer/LayerBuilder.java | 3 +-
5 files changed, 177 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 31e2d9b..d3bc678 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -107,6 +107,8 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException {
+ // TODO make the behaviour of the return value of layerBuilder.generateJoinDataSet() the same as
+ // TODO the original dataset
return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index a052bda..50c646a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -20,14 +20,100 @@
package org.apache.iotdb.db.query.dataset.udf;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.pool.DataSetFragmentExecutionPoolManager;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSet.class);
+
+ private static final int BLOCKING_QUEUE_CAPACITY = 2;
+ private static final DataSetFragmentExecutionPoolManager
+ DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER = DataSetFragmentExecutionPoolManager.getInstance();
+
+ private final BlockingQueue<Object[]> productionBlockingQueue;
+
+ private RowRecord[] rowRecords = null;
+ private int rowRecordsLength = 0;
+ private int rowRecordsIndexConsumed = -1;
+
+ private boolean hasNextRowRecords = true;
+
public UDTFFragmentDataSet(LayerPointReader[] transformers)
throws QueryProcessException, IOException {
super(transformers);
+ productionBlockingQueue = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+ submitTask();
+ }
+
+ @Override
+ public boolean hasNextWithoutConstraint() {
+ try {
+ return rowRecords != null || tryToTakeNextRowRecords();
+ } catch (InterruptedException e) {
+ return onThrowable(e);
+ }
+ }
+
+ private boolean tryToTakeNextRowRecords() throws InterruptedException {
+ if (!hasNextRowRecords) {
+ return false;
+ }
+
+ Object[] production = productionBlockingQueue.take();
+
+ Object rowRecordArrayOrThrowable = production[0];
+ if (rowRecordArrayOrThrowable instanceof Throwable) {
+ return onThrowable((Throwable) rowRecordArrayOrThrowable);
+ }
+
+ rowRecords = (RowRecord[]) production[0];
+ rowRecordsLength = (int) production[1];
+ rowRecordsIndexConsumed = -1;
+ hasNextRowRecords = (boolean) production[2];
+
+ if (rowRecordsLength == 0) {
+ rowRecords = null;
+ // assert !hasNextRowRecords;
+ return false;
+ }
+
+ if (hasNextRowRecords) {
+ submitTask();
+ }
+
+ return true;
+ }
+
+ @Override
+ public RowRecord nextWithoutConstraint() {
+ RowRecord rowRecord = rowRecords[++rowRecordsIndexConsumed];
+ if (rowRecordsIndexConsumed == rowRecordsLength - 1) {
+ rowRecords = null;
+ }
+ return rowRecord;
+ }
+
+ private void submitTask() {
+ if (productionBlockingQueue.remainingCapacity() > 0) {
+ DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER.submit(
+ new UDTFFragmentDataSetTask(fetchSize, this, productionBlockingQueue));
+ }
+ }
+
+ private boolean onThrowable(Throwable throwable) {
+ LOGGER.warn("Error occurred while pulling data in fragment dataset: ", throwable);
+ if (throwable instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(throwable);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
new file mode 100644
index 0000000..fb50dea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.udf;
+
+import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+public class UDTFFragmentDataSetTask extends WrappedRunnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSetTask.class);
+
+ private final int fetchSize;
+ private final UDTFFragmentDataSet fragmentDataSet;
+
+ // there are 3 elements in Object[].
+ // [0]: RowRecord[] or Throwable.
+ // [2]: Integer. actual length of produced row records in [0]. note that the element is -1 when
+ // the [0] element is a Throwable.
+ // [1]: Boolean. true if the fragmentDataSet still has next RowRecord to be consumed, otherwise
+ // false. note that the element is false when the [0] element is a Throwable.
+ private final BlockingQueue<Object[]> productionBlockingQueue;
+
+ public UDTFFragmentDataSetTask(
+ int fetchSize,
+ UDTFFragmentDataSet fragmentDataSet,
+ BlockingQueue<Object[]> productionBlockingQueue) {
+ this.fetchSize = fetchSize;
+ this.fragmentDataSet = fragmentDataSet;
+ this.productionBlockingQueue = productionBlockingQueue;
+ }
+
+ @Override
+ public void runMayThrow() {
+ try {
+ int rowRecordCount = 0;
+ RowRecord[] rowRecords = new RowRecord[fetchSize];
+ while (rowRecordCount < fetchSize && fragmentDataSet.hasNextWithoutConstraint()) {
+ rowRecords[rowRecordCount++] = fragmentDataSet.nextWithoutConstraint();
+ }
+
+ // if a task is submitted, there must be free space in the queue
+ productionBlockingQueue.put(
+ new Object[] {rowRecords, rowRecordCount, fragmentDataSet.hasNextWithoutConstraint()});
+ } catch (Throwable e) {
+ onThrowable(e);
+ }
+ }
+
+ private void onThrowable(Throwable throwable) {
+ LOGGER.error("Error occurred while querying in fragment: ", throwable);
+
+ if (throwable instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+
+ try {
+ productionBlockingQueue.put(new Object[] {throwable, -1, false});
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while putting Throwable into the blocking queue: ", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index b9ac75f..ca2ef3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -32,7 +32,7 @@ import java.io.IOException;
// TODO: performances joining in pool, packing row records while calculating
public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
- private final UDTFDataSet[] fragmentDataSets;
+ private final UDTFFragmentDataSet[] fragmentDataSets;
/**
* Each output column of the UDTFJoinDataSet corresponds to a two-tuple ({@code int[]}) instance
@@ -52,7 +52,8 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
private TimeSelector timeHeap;
public UDTFJoinDataSet(
- UDTFDataSet[] fragmentDataSets, int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
+ UDTFFragmentDataSet[] fragmentDataSets,
+ int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
throws IOException {
this.fragmentDataSets = fragmentDataSets;
this.resultColumnOutputIndexToFragmentDataSetOutputIndex =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index b46b146..1541829 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.udf.core.layer;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.query.dataset.udf.UDTFDataSet;
import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet;
import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet;
import org.apache.iotdb.db.query.expression.Expression;
@@ -147,7 +146,7 @@ public class LayerBuilder {
public QueryDataSet generateJoinDataSet() throws QueryProcessException, IOException {
int n = fragmentDataSetIndexToLayerPointReaders.size();
- UDTFDataSet[] fragmentDataSets = new UDTFDataSet[n];
+ UDTFFragmentDataSet[] fragmentDataSets = new UDTFFragmentDataSet[n];
for (int i = 0; i < n; ++i) {
fragmentDataSets[i] =
new UDTFFragmentDataSet(