You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/27 05:51:25 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5815] Fix Npe when UDF spilling data to disk
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 313aa143a6 [To rel/1.1][IOTDB-5815] Fix Npe when UDF spilling data to disk
313aa143a6 is described below
commit 313aa143a6dbef703bc5c7383bf1d0bb6d09735f
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Thu Apr 27 13:51:16 2023 +0800
[To rel/1.1][IOTDB-5815] Fix Npe when UDF spilling data to disk
---
.../iotdb/commons/udf/service/UDFClassLoaderManager.java | 6 +++---
.../mpp/execution/operator/process/TransformOperator.java | 13 +++++++++----
.../plan/expression/visitor/ColumnTransformerVisitor.java | 2 +-
.../plan/expression/visitor/IntermediateLayerVisitor.java | 4 ++--
.../transformation/dag/builder/EvaluationDAGBuilder.java | 4 ++--
.../transformation/dag/input/QueryDataSetInputLayer.java | 5 +++--
.../dag/intermediate/ConstantIntermediateLayer.java | 3 ++-
.../transformation/dag/intermediate/IntermediateLayer.java | 4 ++--
.../intermediate/MultiInputColumnIntermediateLayer.java | 2 +-
.../SingleInputColumnMultiReferenceIntermediateLayer.java | 2 +-
.../SingleInputColumnSingleReferenceIntermediateLayer.java | 2 +-
.../iotdb/db/mpp/transformation/dag/udf/UDTFContext.java | 2 +-
.../iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java | 2 +-
.../mpp/transformation/datastructure/SerializableList.java | 6 +++---
.../row/ElasticSerializableRowRecordList.java | 6 +++---
.../datastructure/row/SerializableRowRecordList.java | 2 +-
.../datastructure/tv/ElasticSerializableBinaryTVList.java | 2 +-
.../datastructure/tv/ElasticSerializableTVList.java | 8 ++++----
.../datastructure/tv/SerializableTVList.java | 2 +-
.../iotdb/db/query/control/QueryResourceManager.java | 7 +------
.../iotdb/db/service/TemporaryQueryDataFileService.java | 14 +++++++-------
.../db/query/udf/datastructure/SerializableListTest.java | 2 +-
22 files changed, 51 insertions(+), 49 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
index c1192a4e26..1ee3f4f78f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
@@ -38,7 +38,7 @@ public class UDFClassLoaderManager implements IService {
private final String libRoot;
/** The keys in the map are the query IDs of the UDF queries being executed. */
- private final Map<Long, UDFClassLoader> queryIdToUDFClassLoaderMap;
+ private final Map<String, UDFClassLoader> queryIdToUDFClassLoaderMap;
/**
* activeClassLoader is used to load all classes under libRoot. libRoot may be updated before the
@@ -54,12 +54,12 @@ public class UDFClassLoaderManager implements IService {
activeClassLoader = null;
}
- public void initializeUDFQuery(long queryId) {
+ public void initializeUDFQuery(String queryId) {
activeClassLoader.acquire();
queryIdToUDFClassLoaderMap.put(queryId, activeClassLoader);
}
- public void finalizeUDFQuery(long queryId) {
+ public void finalizeUDFQuery(String queryId) {
UDFClassLoader classLoader = queryIdToUDFClassLoaderMap.remove(queryId);
try {
if (classLoader != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 751c121911..f5c056a3e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -77,6 +77,8 @@ public class TransformOperator implements ProcessOperator {
protected TimeSelector timeHeap;
protected boolean[] shouldIterateReadersToNextValid;
+ private final String udtfQueryId;
+
public TransformOperator(
OperatorContext operatorContext,
Operator inputOperator,
@@ -91,6 +93,9 @@ public class TransformOperator implements ProcessOperator {
this.operatorContext = operatorContext;
this.inputOperator = inputOperator;
this.keepNull = keepNull;
+ // use DriverTaskID().getFullId() to ensure that udtfQueryId for each TransformOperator is
+ // unique
+ this.udtfQueryId = operatorContext.getDriverContext().getDriverTaskID().getFullId();
initInputLayer(inputDataTypes);
initUdtfContext(outputExpressions, zoneId);
@@ -103,7 +108,7 @@ public class TransformOperator implements ProcessOperator {
private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException {
inputLayer =
new QueryDataSetInputLayer(
- operatorContext.getOperatorId(),
+ udtfQueryId,
udfReaderMemoryBudgetInMB,
new TsBlockInputDataSet(inputOperator, inputDataTypes));
}
@@ -120,11 +125,11 @@ public class TransformOperator implements ProcessOperator {
UDFManagementService.getInstance().acquireLock();
try {
// This statement must be surrounded by the registration lock.
- UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
+ UDFClassLoaderManager.getInstance().initializeUDFQuery(udtfQueryId);
// UDF executors will be initialized at the same time
transformers =
new EvaluationDAGBuilder(
- operatorContext.getOperatorId(),
+ udtfQueryId,
inputLayer,
inputLocations,
outputExpressions,
@@ -324,7 +329,7 @@ public class TransformOperator implements ProcessOperator {
@Override
public void close() throws Exception {
- udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());
+ udtfContext.finalizeUDFExecutors(udtfQueryId);
inputOperator.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ColumnTransformerVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ColumnTransformerVisitor.java
index c980f79967..7f4f9c3ff0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ColumnTransformerVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ColumnTransformerVisitor.java
@@ -238,7 +238,7 @@ public class ColumnTransformerVisitor
// Mappable UDF does not need PointCollector, so memoryBudget and queryId is not
// needed.
executor.beforeStart(
- 0,
+ String.valueOf(0),
0,
expressions.stream().map(Expression::toString).collect(Collectors.toList()),
expressions.stream().map(context::getType).collect(Collectors.toList()),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
index 68681dfac7..3791cf8849 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
@@ -447,7 +447,7 @@ public class IntermediateLayerVisitor
}
public static class IntermediateLayerVisitorContext {
- long queryId;
+ String queryId;
UDTFContext udtfContext;
@@ -460,7 +460,7 @@ public class IntermediateLayerVisitor
LayerMemoryAssigner memoryAssigner;
public IntermediateLayerVisitorContext(
- long queryId,
+ String queryId,
UDTFContext udtfContext,
QueryDataSetInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/builder/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/builder/EvaluationDAGBuilder.java
index 920487fabc..af25b31776 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/builder/EvaluationDAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/builder/EvaluationDAGBuilder.java
@@ -36,7 +36,7 @@ import java.util.Map;
public class EvaluationDAGBuilder {
- private final long queryId;
+ private final String queryId;
private final QueryDataSetInputLayer inputLayer;
private final Map<String, List<InputLocation>> inputLocations;
@@ -57,7 +57,7 @@ public class EvaluationDAGBuilder {
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
public EvaluationDAGBuilder(
- long queryId,
+ String queryId,
QueryDataSetInputLayer inputLayer,
Map<String, List<InputLocation>> inputLocations,
Expression[] outputExpressions,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
index 7825b2f423..43f3e319e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
@@ -39,12 +39,13 @@ public class QueryDataSetInputLayer {
private ElasticSerializableRowRecordList rowRecordList;
private SafetyLine safetyLine;
- public QueryDataSetInputLayer(long queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet)
+ public QueryDataSetInputLayer(
+ String queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet)
throws QueryProcessException {
construct(queryId, memoryBudgetInMB, queryDataSet);
}
- private void construct(long queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet)
+ private void construct(String queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet)
throws QueryProcessException {
this.queryDataSet = queryDataSet;
dataTypes = queryDataSet.getDataTypes().toArray(new TSDataType[0]);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
index f8489e9139..120bda0305 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
@@ -35,7 +35,8 @@ public class ConstantIntermediateLayer extends IntermediateLayer {
private final LayerPointReader constantLayerPointReaderCache;
- public ConstantIntermediateLayer(ConstantOperand expression, long queryId, float memoryBudgetInMB)
+ public ConstantIntermediateLayer(
+ ConstantOperand expression, String queryId, float memoryBudgetInMB)
throws QueryProcessException {
super(expression, queryId, memoryBudgetInMB);
constantLayerPointReaderCache = new ConstantInputReader(expression);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
index 94608a54ba..83e2e659d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
@@ -39,10 +39,10 @@ public abstract class IntermediateLayer {
// for debug
protected final Expression expression;
- protected final long queryId;
+ protected final String queryId;
protected final float memoryBudgetInMB;
- protected IntermediateLayer(Expression expression, long queryId, float memoryBudgetInMB) {
+ protected IntermediateLayer(Expression expression, String queryId, float memoryBudgetInMB) {
this.expression = expression;
this.queryId = queryId;
this.memoryBudgetInMB = memoryBudgetInMB;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index 9c16557c5e..7b573efc66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -64,7 +64,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
public MultiInputColumnIntermediateLayer(
Expression expression,
- long queryId,
+ String queryId,
float memoryBudgetInMB,
List<LayerPointReader> parentLayerPointReaders) {
super(expression, queryId, memoryBudgetInMB);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 38e474aaba..f23bac344c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -60,7 +60,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
public SingleInputColumnMultiReferenceIntermediateLayer(
Expression expression,
- long queryId,
+ String queryId,
float memoryBudgetInMB,
LayerPointReader parentLayerPointReader) {
super(expression, queryId, memoryBudgetInMB);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 0ff38822d9..2299ffc69c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -54,7 +54,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
public SingleInputColumnSingleReferenceIntermediateLayer(
Expression expression,
- long queryId,
+ String queryId,
float memoryBudgetInMB,
LayerPointReader parentLayerPointReader) {
super(expression, queryId, memoryBudgetInMB);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFContext.java
index 293fc97ac4..0753428a2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFContext.java
@@ -44,7 +44,7 @@ public class UDTFContext {
}
}
- public void finalizeUDFExecutors(long queryId) {
+ public void finalizeUDFExecutors(String queryId) {
try {
for (UDTFExecutor executor : expressionName2Executor.values()) {
executor.beforeDestroy();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java
index c86faef2d0..4554cb44dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java
@@ -55,7 +55,7 @@ public class UDTFExecutor {
}
public void beforeStart(
- long queryId,
+ String queryId,
float collectorMemoryBudgetInMB,
List<String> childExpressions,
List<TSDataType> childExpressionDataTypes,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/SerializableList.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/SerializableList.java
index 9ab4449a69..82f836e3ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/SerializableList.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/SerializableList.java
@@ -48,7 +48,7 @@ public interface SerializableList {
protected static final int NOT_SERIALIZED = -1;
- protected final long queryId;
+ protected final String queryId;
protected boolean isSerialized;
protected int serializedByteLength;
@@ -58,7 +58,7 @@ public interface SerializableList {
protected RandomAccessFile file;
protected FileChannel fileChannel;
- public SerializationRecorder(long queryId) {
+ public SerializationRecorder(String queryId) {
this.queryId = queryId;
isSerialized = false;
serializedByteLength = NOT_SERIALIZED;
@@ -129,7 +129,7 @@ public interface SerializableList {
fileChannel = null;
}
- public long getQueryId() {
+ public String getQueryId() {
return queryId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/ElasticSerializableRowRecordList.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/ElasticSerializableRowRecordList.java
index f2856331e0..a2b2717133 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/ElasticSerializableRowRecordList.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/ElasticSerializableRowRecordList.java
@@ -37,7 +37,7 @@ public class ElasticSerializableRowRecordList {
protected static final int MEMORY_CHECK_THRESHOLD = 1000;
protected TSDataType[] dataTypes;
- protected long queryId;
+ protected String queryId;
protected float memoryLimitInMB;
protected int internalRowRecordListCapacity;
protected int numCacheBlock;
@@ -65,7 +65,7 @@ public class ElasticSerializableRowRecordList {
* @param numCacheBlock Number of cache blocks.
*/
public ElasticSerializableRowRecordList(
- TSDataType[] dataTypes, long queryId, float memoryLimitInMB, int numCacheBlock)
+ TSDataType[] dataTypes, String queryId, float memoryLimitInMB, int numCacheBlock)
throws QueryProcessException {
this.dataTypes = dataTypes;
this.queryId = queryId;
@@ -110,7 +110,7 @@ public class ElasticSerializableRowRecordList {
protected ElasticSerializableRowRecordList(
TSDataType[] dataTypes,
- long queryId,
+ String queryId,
float memoryLimitInMB,
int internalRowRecordListCapacity,
int numCacheBlock) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/SerializableRowRecordList.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/SerializableRowRecordList.java
index b13096659f..334835d41c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/SerializableRowRecordList.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/row/SerializableRowRecordList.java
@@ -40,7 +40,7 @@ public class SerializableRowRecordList implements SerializableList {
protected static final int MIN_ARRAY_HEADER_SIZE = MIN_OBJECT_HEADER_SIZE + 4;
public static SerializableRowRecordList newSerializableRowRecordList(
- long queryId, TSDataType[] dataTypes, int internalRowRecordListCapacity) {
+ String queryId, TSDataType[] dataTypes, int internalRowRecordListCapacity) {
SerializationRecorder recorder = new SerializationRecorder(queryId);
return new SerializableRowRecordList(recorder, dataTypes, internalRowRecordListCapacity);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
index bb6c06a889..b6ee2711db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
@@ -35,7 +35,7 @@ public class ElasticSerializableBinaryTVList extends ElasticSerializableTVList {
protected long totalByteArrayLengthLimit;
protected long totalByteArrayLength;
- public ElasticSerializableBinaryTVList(long queryId, float memoryLimitInMB, int cacheSize) {
+ public ElasticSerializableBinaryTVList(String queryId, float memoryLimitInMB, int cacheSize) {
super(TSDataType.TEXT, queryId, memoryLimitInMB, cacheSize);
byteArrayLengthForMemoryControl = SerializableList.INITIAL_BYTE_ARRAY_LENGTH_FOR_MEMORY_CONTROL;
totalByteArrayLengthLimit = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableTVList.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableTVList.java
index db8dab6c39..eacec154aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/ElasticSerializableTVList.java
@@ -38,14 +38,14 @@ import java.util.List;
public class ElasticSerializableTVList implements PointCollector {
public static ElasticSerializableTVList newElasticSerializableTVList(
- TSDataType dataType, long queryId, float memoryLimitInMB, int cacheSize) {
+ TSDataType dataType, String queryId, float memoryLimitInMB, int cacheSize) {
return dataType.equals(TSDataType.TEXT)
? new ElasticSerializableBinaryTVList(queryId, memoryLimitInMB, cacheSize)
: new ElasticSerializableTVList(dataType, queryId, memoryLimitInMB, cacheSize);
}
protected TSDataType dataType;
- protected long queryId;
+ protected String queryId;
protected float memoryLimitInMB;
protected int internalTVListCapacity;
protected int cacheSize;
@@ -62,7 +62,7 @@ public class ElasticSerializableTVList implements PointCollector {
protected int evictionUpperBound;
protected ElasticSerializableTVList(
- TSDataType dataType, long queryId, float memoryLimitInMB, int cacheSize) {
+ TSDataType dataType, String queryId, float memoryLimitInMB, int cacheSize) {
this.dataType = dataType;
this.queryId = queryId;
this.memoryLimitInMB = memoryLimitInMB;
@@ -83,7 +83,7 @@ public class ElasticSerializableTVList implements PointCollector {
protected ElasticSerializableTVList(
TSDataType dataType,
- long queryId,
+ String queryId,
float memoryLimitInMB,
int internalTVListCapacity,
int cacheSize) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/SerializableTVList.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/SerializableTVList.java
index a031be2a2b..c521a41fe4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/SerializableTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/tv/SerializableTVList.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
public abstract class SerializableTVList extends BatchData implements SerializableList {
- public static SerializableTVList newSerializableTVList(TSDataType dataType, long queryId) {
+ public static SerializableTVList newSerializableTVList(TSDataType dataType, String queryId) {
SerializationRecorder recorder = new SerializationRecorder(queryId);
switch (dataType) {
case INT32:
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index d6ea3cb31c..eda3223076 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.query.control;
-import org.apache.iotdb.db.service.TemporaryQueryDataFileService;
-
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -68,13 +66,10 @@ public class QueryResourceManager {
* query tokens created by this jdbc request must be cleared.
*/
// Suppress high Cognitive Complexity warning
+ // attention: Since V1.0, Query Module does not use this method for cleaning
public void endQuery(long queryId) {
-
// remove usage of opened file paths of current thread
filePathsManager.removeUsedFilesForQuery(queryId);
-
- // close and delete UDF temp files
- TemporaryQueryDataFileService.getInstance().deregister(queryId);
}
public QueryFileManager getQueryFileManager() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java b/server/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java
index 4cd00dde1a..1b33990cec 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java
@@ -50,7 +50,7 @@ public class TemporaryQueryDataFileService implements IService {
+ "tmp";
private final AtomicLong uniqueDataId;
- private final Map<Long, List<SerializationRecorder>> recorders;
+ private final Map<String, List<SerializationRecorder>> recorders;
private TemporaryQueryDataFileService() {
uniqueDataId = new AtomicLong(0);
@@ -58,7 +58,7 @@ public class TemporaryQueryDataFileService implements IService {
}
public String register(SerializationRecorder recorder) throws IOException {
- long queryId = recorder.getQueryId();
+ String queryId = recorder.getQueryId();
if (!recorders.containsKey(queryId)) {
recorders.put(queryId, new ArrayList<>());
}
@@ -69,7 +69,7 @@ public class TemporaryQueryDataFileService implements IService {
return getFileName(dirName, uniqueDataId.getAndIncrement());
}
- public void deregister(long queryId) {
+ public void deregister(String queryId) {
List<SerializationRecorder> recorderList = recorders.remove(queryId);
if (recorderList == null) {
return;
@@ -79,14 +79,14 @@ public class TemporaryQueryDataFileService implements IService {
recorder.closeFile();
} catch (IOException e) {
logger.warn(
- String.format("Failed to close file in method deregister(%d), because %s", queryId, e));
+ String.format("Failed to close file in method deregister(%s), because %s", queryId, e));
}
}
try {
FileUtils.cleanDirectory(SystemFileFactory.INSTANCE.getFile(getDirName(queryId)));
} catch (IOException e) {
logger.warn(
- String.format("Failed to clean dir in method deregister(%d), because %s", queryId, e));
+ String.format("Failed to clean dir in method deregister(%s), because %s", queryId, e));
}
}
@@ -98,7 +98,7 @@ public class TemporaryQueryDataFileService implements IService {
FileUtils.forceMkdir(file);
}
- private String getDirName(long queryId) {
+ private String getDirName(String queryId) {
return TEMPORARY_FILE_DIR + File.separator + queryId + File.separator;
}
@@ -118,7 +118,7 @@ public class TemporaryQueryDataFileService implements IService {
@Override
public void stop() {
for (Object queryId : recorders.keySet().toArray()) {
- deregister((Long) queryId);
+ deregister((String) queryId);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/SerializableListTest.java b/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/SerializableListTest.java
index dbbd2c2bfa..7ef96f91da 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/SerializableListTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/SerializableListTest.java
@@ -29,7 +29,7 @@ public abstract class SerializableListTest {
protected static final float MEMORY_USAGE_LIMIT_IN_MB = 100f;
protected static final int CACHE_SIZE = 3;
- protected static final long QUERY_ID = 0;
+ protected static final String QUERY_ID = String.valueOf(0);
protected static final int INTERNAL_ROW_RECORD_LIST_CAPACITY = 8;