You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/02 04:18:28 UTC
[iotdb] 01/06: fix bug when spliting
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4f9d53acd8dbba0e2364ef9377e11fb872a18f58
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Dec 1 16:52:29 2021 +0800
fix bug when spliting
---
.../query/dataset/udf/UDTFAlignByTimeDataSet.java | 6 ++--
.../iotdb/db/query/dataset/udf/UDTFDataSet.java | 6 ++--
.../db/query/dataset/udf/UDTFFragmentDataSet.java | 11 ++++---
.../query/dataset/udf/UDTFFragmentDataSetTask.java | 17 +++++-----
.../db/query/dataset/udf/UDTFJoinDataSet.java | 3 +-
.../iotdb/db/query/expression/Expression.java | 3 ++
.../query/expression/binary/BinaryExpression.java | 14 +++++++++
.../db/query/expression/unary/ConstantOperand.java | 7 +++++
.../query/expression/unary/FunctionExpression.java | 18 +++++++++++
.../query/expression/unary/NegationExpression.java | 9 ++++++
.../query/expression/unary/TimeSeriesOperand.java | 7 +++++
.../pool/DataSetFragmentExecutionPoolManager.java | 8 +++--
.../db/query/udf/core/layer/LayerBuilder.java | 10 +++---
.../query/udf/core/layer/RawQueryInputLayer.java | 36 ++++++++++++----------
14 files changed, 111 insertions(+), 44 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index d3bc678..b0b6a3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -80,9 +81,10 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
/** for data set fragment */
- protected UDTFAlignByTimeDataSet(LayerPointReader[] transformers)
+ protected UDTFAlignByTimeDataSet(
+ RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers)
throws QueryProcessException, IOException {
- super(transformers);
+ super(rawQueryInputLayer, transformers);
initTimeHeap();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
index 5700336..3e523b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
@@ -128,12 +128,12 @@ public abstract class UDTFDataSet extends QueryDataSet {
}
/** for data set fragment */
- protected UDTFDataSet(LayerPointReader[] transformers) {
- // The following 3 fields are useless because they are recorded in their parent data set.
+ protected UDTFDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers) {
+ // The following 2 fields are useless.
queryId = -1;
udtfPlan = null;
- rawQueryInputLayer = null;
+ this.rawQueryInputLayer = rawQueryInputLayer;
this.transformers = transformers;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index 50c646a..8d1cbbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.db.query.dataset.udf;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.pool.DataSetFragmentExecutionPoolManager;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +33,7 @@ import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
+public class UDTFFragmentDataSet extends QueryDataSet {
private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSet.class);
@@ -39,6 +41,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
private static final DataSetFragmentExecutionPoolManager
DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER = DataSetFragmentExecutionPoolManager.getInstance();
+ private final QueryDataSet fragmentDataSet;
private final BlockingQueue<Object[]> productionBlockingQueue;
private RowRecord[] rowRecords = null;
@@ -47,9 +50,9 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
private boolean hasNextRowRecords = true;
- public UDTFFragmentDataSet(LayerPointReader[] transformers)
+ public UDTFFragmentDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers)
throws QueryProcessException, IOException {
- super(transformers);
+ fragmentDataSet = new UDTFAlignByTimeDataSet(rawQueryInputLayer, transformers);
productionBlockingQueue = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
submitTask();
}
@@ -105,7 +108,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
private void submitTask() {
if (productionBlockingQueue.remainingCapacity() > 0) {
DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER.submit(
- new UDTFFragmentDataSetTask(fetchSize, this, productionBlockingQueue));
+ new UDTFFragmentDataSetTask(fetchSize, fragmentDataSet, productionBlockingQueue));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
index fb50dea..62452dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.dataset.udf;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,22 +33,20 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable {
private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSetTask.class);
private final int fetchSize;
- private final UDTFFragmentDataSet fragmentDataSet;
+ private final QueryDataSet queryDataSet;
// there are 3 elements in Object[].
// [0]: RowRecord[] or Throwable.
// [2]: Integer. actual length of produced row records in [0]. note that the element is -1 when
// the [0] element is a Throwable.
- // [1]: Boolean. true if the fragmentDataSet still has next RowRecord to be consumed, otherwise
+ // [1]: Boolean. true if the queryDataSet still has next RowRecord to be consumed, otherwise
// false. note that the element is false when the [0] element is a Throwable.
private final BlockingQueue<Object[]> productionBlockingQueue;
public UDTFFragmentDataSetTask(
- int fetchSize,
- UDTFFragmentDataSet fragmentDataSet,
- BlockingQueue<Object[]> productionBlockingQueue) {
+ int fetchSize, QueryDataSet queryDataSet, BlockingQueue<Object[]> productionBlockingQueue) {
this.fetchSize = fetchSize;
- this.fragmentDataSet = fragmentDataSet;
+ this.queryDataSet = queryDataSet;
this.productionBlockingQueue = productionBlockingQueue;
}
@@ -56,13 +55,13 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable {
try {
int rowRecordCount = 0;
RowRecord[] rowRecords = new RowRecord[fetchSize];
- while (rowRecordCount < fetchSize && fragmentDataSet.hasNextWithoutConstraint()) {
- rowRecords[rowRecordCount++] = fragmentDataSet.nextWithoutConstraint();
+ while (rowRecordCount < fetchSize && queryDataSet.hasNextWithoutConstraint()) {
+ rowRecords[rowRecordCount++] = queryDataSet.nextWithoutConstraint();
}
// if a task is submitted, there must be free space in the queue
productionBlockingQueue.put(
- new Object[] {rowRecords, rowRecordCount, fragmentDataSet.hasNextWithoutConstraint()});
+ new Object[] {rowRecords, rowRecordCount, queryDataSet.hasNextWithoutConstraint()});
} catch (Throwable e) {
onThrowable(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index ca2ef3a..323e1c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
-// TODO: performances joining in pool, packing row records while calculating
public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
private final UDTFFragmentDataSet[] fragmentDataSets;
@@ -68,7 +67,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
timeHeap = new TimeSelector(resultColumnsLength << 1, true);
for (int i = 0; i < resultColumnsLength; ++i) {
- UDTFDataSet fragmentDataSet = fragmentDataSets[i];
+ QueryDataSet fragmentDataSet = fragmentDataSets[i];
if (fragmentDataSet.hasNextWithoutConstraint()) {
rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint();
timeHeap.add(rowRecordsCache[i].getTimestamp());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 27df69b..6284c02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -96,6 +96,9 @@ public abstract class Expression {
return expressionIntermediateLayerMap.get(this);
}
+ public abstract Integer tryToGetFragmentDataSetIndex(
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap);
+
/** Sub-classes should override this method indicating if the expression is a constant operand */
protected abstract boolean isConstantOperandInternal();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 176b746..9fc05dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -188,6 +188,20 @@ public abstract class BinaryExpression extends Expression {
LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
@Override
+ public Integer tryToGetFragmentDataSetIndex(
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+ IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+ if (intermediateLayer != null) {
+ return intermediateLayer.getFragmentDataSetIndex();
+ }
+
+ Integer index = leftExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+ return index != null
+ ? index
+ : rightExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+ }
+
+ @Override
public final String getExpressionStringInternal() {
StringBuilder builder = new StringBuilder();
if (leftExpression instanceof BinaryExpression) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0a58321..f55c82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -102,6 +102,13 @@ public class ConstantOperand extends Expression {
}
@Override
+ public Integer tryToGetFragmentDataSetIndex(
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+ IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+ return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex();
+ }
+
+ @Override
public String getExpressionStringInternal() {
return valueString;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 2217fe6..b9e8d7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -293,6 +293,24 @@ public class FunctionExpression extends Expression {
}
}
+ @Override
+ public Integer tryToGetFragmentDataSetIndex(
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+ IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+ if (intermediateLayer != null) {
+ return intermediateLayer.getFragmentDataSetIndex();
+ }
+
+ for (Expression expression : expressions) {
+ Integer index = expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+ if (index != null) {
+ return index;
+ }
+ }
+
+ return null;
+ }
+
public List<PartialPath> getPaths() {
if (paths == null) {
paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 39f4bab..c57d8e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -138,6 +138,15 @@ public class NegationExpression extends Expression {
}
@Override
+ public Integer tryToGetFragmentDataSetIndex(
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+ IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+ return intermediateLayer != null
+ ? intermediateLayer.getFragmentDataSetIndex()
+ : expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+ }
+
+ @Override
public String getExpressionStringInternal() {
return "-" + expression.toString();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 9c65d48..ec2a4de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -116,6 +116,13 @@ public class TimeSeriesOperand extends Expression {
this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader));
}
+ @Override
+ public Integer tryToGetFragmentDataSetIndex(
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+ IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+ return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex();
+ }
+
public String getExpressionStringInternal() {
return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
index 0705380..c62e415 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
@@ -35,7 +35,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
private DataSetFragmentExecutionPoolManager() {
pool =
IoTDBThreadPoolFactory.newFixedThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+ Math.min(
+ Runtime.getRuntime().availableProcessors(),
+ IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()),
ThreadName.QUERY_FRAGMENT_SERVICE.getName());
}
@@ -58,7 +60,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
if (pool == null) {
pool =
IoTDBThreadPoolFactory.newFixedThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+ Math.min(
+ Runtime.getRuntime().availableProcessors(),
+ IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()),
ThreadName.QUERY_FRAGMENT_SERVICE.getName());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 1541829..a7b0582 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -93,12 +93,9 @@ public class LayerBuilder {
public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
for (int i = 0, n = resultColumnExpressions.length; i < n; ++i) {
// resultColumnExpressions[i] -> the index of the fragment it belongs to
- int fragmentDataSetIndex;
- IntermediateLayer intermediateLayer =
- expressionIntermediateLayerMap.get(resultColumnExpressions[i]);
- if (intermediateLayer != null) {
- fragmentDataSetIndex = intermediateLayer.getFragmentDataSetIndex();
- } else {
+ Integer fragmentDataSetIndex =
+ resultColumnExpressions[i].tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+ if (fragmentDataSetIndex == null) {
fragmentDataSetIndex = fragmentDataSetIndexToLayerPointReaders.size();
fragmentDataSetIndexToLayerPointReaders.add(new ArrayList<>());
}
@@ -150,6 +147,7 @@ public class LayerBuilder {
for (int i = 0; i < n; ++i) {
fragmentDataSets[i] =
new UDTFFragmentDataSet(
+ rawTimeSeriesInputLayer,
fragmentDataSetIndexToLayerPointReaders.get(i).toArray(new LayerPointReader[0]));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index 55d8aca..5efef6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -87,7 +87,9 @@ public class RawQueryInputLayer {
}
public void updateRowRecordListEvictionUpperBound() {
- rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+ synchronized (rowRecordList) {
+ rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+ }
}
public LayerPointReader constructPointReader(int columnIndex) {
@@ -125,27 +127,29 @@ public class RawQueryInputLayer {
return true;
}
- for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
- Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
- if (rowRecordCandidate[columnIndex] != null) {
- hasCachedRowRecord = true;
- cachedRowRecord = rowRecordCandidate;
- currentRowIndex = i;
- break;
- }
- }
-
- if (!hasCachedRowRecord) {
- while (queryDataSet.hasNextRowInObjects()) {
- Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
- rowRecordList.put(rowRecordCandidate);
+ synchronized (rowRecordList) {
+ for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
+ Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
if (rowRecordCandidate[columnIndex] != null) {
hasCachedRowRecord = true;
cachedRowRecord = rowRecordCandidate;
- currentRowIndex = rowRecordList.size() - 1;
+ currentRowIndex = i;
break;
}
}
+
+ if (!hasCachedRowRecord) {
+ while (queryDataSet.hasNextRowInObjects()) {
+ Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+ rowRecordList.put(rowRecordCandidate);
+ if (rowRecordCandidate[columnIndex] != null) {
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordCandidate;
+ currentRowIndex = rowRecordList.size() - 1;
+ break;
+ }
+ }
+ }
}
return hasCachedRowRecord;