You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/19 06:05:40 UTC
[iotdb] branch master updated: [IOTDB-5689] Close Isink when ISourceHandle is closed
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 487dd31417 [IOTDB-5689] Close Isink when ISourceHandle is closed
487dd31417 is described below
commit 487dd31417abf0a04864b1a812e191784a7d2e98
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun Mar 19 14:05:34 2023 +0800
[IOTDB-5689] Close Isink when ISourceHandle is closed
---
.../iotdb/db/mpp/execution/driver/Driver.java | 13 +-
.../execution/exchange/MPPDataExchangeManager.java | 67 ++-
.../mpp/execution/exchange/SharedTsBlockQueue.java | 6 +-
.../db/mpp/execution/exchange/sink/ISink.java | 4 +
.../mpp/execution/exchange/sink/ISinkHandle.java | 3 +
.../execution/exchange/sink/LocalSinkChannel.java | 7 +
.../execution/exchange/sink/ShuffleSinkHandle.java | 24 +-
.../mpp/execution/exchange/sink/SinkChannel.java | 11 +-
.../execution/exchange/source/SourceHandle.java | 59 ++-
.../iotdb/db/mpp/execution/operator/Operator.java | 10 +-
.../process/AbstractConsumeAllOperator.java | 4 +-
.../operator/process/AbstractIntoOperator.java | 7 +-
.../operator/process/AggregationOperator.java | 6 +-
.../operator/process/DeviceMergeOperator.java | 6 +-
.../operator/process/DeviceViewOperator.java | 17 +-
.../execution/operator/process/FillOperator.java | 6 +-
.../operator/process/FilterAndProjectOperator.java | 6 +-
.../execution/operator/process/LimitOperator.java | 6 +-
.../operator/process/LinearFillOperator.java | 8 +-
.../operator/process/MergeSortOperator.java | 8 +-
.../execution/operator/process/OffsetOperator.java | 6 +-
.../process/RawDataAggregationOperator.java | 9 +-
.../operator/process/SingleDeviceViewOperator.java | 6 +-
.../process/SingleInputAggregationOperator.java | 6 +-
.../process/SlidingWindowAggregationOperator.java | 4 +-
.../execution/operator/process/SortOperator.java | 6 +-
.../operator/process/TagAggregationOperator.java | 8 +-
.../operator/process/TransformOperator.java | 16 +-
.../process/join/HorizontallyConcatOperator.java | 8 +-
.../process/join/RowBasedTimeJoinOperator.java | 11 +-
.../operator/process/join/TimeJoinOperator.java | 9 +-
.../last/AbstractUpdateLastCacheOperator.java | 4 +-
.../last/AlignedUpdateLastCacheOperator.java | 2 +-
.../process/last/LastQueryCollectOperator.java | 6 +-
.../process/last/LastQueryMergeOperator.java | 12 +-
.../operator/process/last/LastQueryOperator.java | 7 +-
.../process/last/LastQuerySortOperator.java | 7 +-
.../process/last/UpdateLastCacheOperator.java | 2 +-
.../schema/CountGroupByLevelMergeOperator.java | 6 +-
.../schema/CountGroupByLevelScanOperator.java | 6 +-
.../operator/schema/CountMergeOperator.java | 7 +-
.../schema/NodeManageMemoryMergeOperator.java | 6 +-
.../operator/schema/NodePathsConvertOperator.java | 6 +-
.../operator/schema/NodePathsCountOperator.java | 6 +-
.../operator/schema/SchemaCountOperator.java | 6 +-
.../operator/schema/SchemaFetchMergeOperator.java | 6 +-
.../operator/schema/SchemaFetchScanOperator.java | 6 +-
.../operator/schema/SchemaQueryMergeOperator.java | 6 +-
.../schema/SchemaQueryOrderByHeatOperator.java | 6 +-
.../operator/schema/SchemaQueryScanOperator.java | 6 +-
.../operator/sink/IdentitySinkOperator.java | 34 +-
.../operator/sink/ShuffleHelperOperator.java | 39 +-
.../AbstractSeriesAggregationScanOperator.java | 6 +-
.../operator/source/AlignedSeriesScanOperator.java | 6 +-
.../operator/source/ExchangeOperator.java | 6 +-
.../operator/source/LastCacheScanOperator.java | 6 +-
.../operator/source/SeriesScanOperator.java | 6 +-
.../operator/source/ShowQueriesOperator.java | 6 +-
.../db/mpp/transformation/api/YieldableReader.java | 6 +-
.../transformation/dag/input/IUDFInputDataSet.java | 2 +-
.../dag/input/QueryDataSetInputLayer.java | 4 +-
.../dag/input/TsBlockInputDataSet.java | 2 +-
.../MultiInputColumnIntermediateLayer.java | 10 +-
...InputColumnMultiReferenceIntermediateLayer.java | 12 +-
...nputColumnSingleReferenceIntermediateLayer.java | 10 +-
.../dag/transformer/Transformer.java | 4 +-
.../dag/transformer/binary/BinaryTransformer.java | 4 +-
.../transformer/binary/LogicBinaryTransformer.java | 2 +-
.../multi/MappableUDFQueryRowTransformer.java | 2 +-
.../transformer/multi/UDFQueryRowTransformer.java | 2 +-
.../multi/UDFQueryRowWindowTransformer.java | 2 +-
.../multi/UniversalUDFQueryTransformer.java | 4 +-
.../transformer/ternary/TernaryTransformer.java | 4 +-
.../dag/transformer/unary/IsNullTransformer.java | 2 +-
.../dag/transformer/unary/UnaryTransformer.java | 2 +-
.../unary/scalar/DiffFunctionTransformer.java | 2 +-
.../transformation/dag/util/LayerCacheUtils.java | 10 +-
.../iotdb/db/mpp/execution/exchange/StubSink.java | 5 +
.../operator/AggregationOperatorTest.java | 26 +-
.../AlignedSeriesAggregationScanOperatorTest.java | 38 +-
.../operator/AlignedSeriesScanOperatorTest.java | 6 +-
.../operator/DeviceMergeOperatorTest.java | 565 ---------------------
.../execution/operator/DeviceViewOperatorTest.java | 3 +-
.../mpp/execution/operator/FillOperatorTest.java | 16 +-
.../operator/HorizontallyConcatOperatorTest.java | 2 +-
.../operator/LastQueryMergeOperatorTest.java | 38 +-
.../execution/operator/LastQueryOperatorTest.java | 4 +-
.../operator/LastQuerySortOperatorTest.java | 5 +-
.../mpp/execution/operator/LimitOperatorTest.java | 2 +-
.../execution/operator/LinearFillOperatorTest.java | 56 +-
.../execution/operator/MergeSortOperatorTest.java | 38 +-
.../mpp/execution/operator/OffsetOperatorTest.java | 6 +-
.../operator/RawDataAggregationOperatorTest.java | 54 +-
.../SeriesAggregationScanOperatorTest.java | 65 ++-
.../execution/operator/SeriesScanOperatorTest.java | 2 +-
.../operator/SingleDeviceViewOperatorTest.java | 3 +-
.../SlidingWindowAggregationOperatorTest.java | 3 +-
.../execution/operator/TimeJoinOperatorTest.java | 6 +-
.../operator/UpdateLastCacheOperatorTest.java | 6 +-
.../operator/schema/SchemaCountOperatorTest.java | 4 +-
.../schema/SchemaQueryScanOperatorTest.java | 4 +-
thrift/src/main/thrift/datanode.thrift | 12 +-
102 files changed, 674 insertions(+), 970 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 2fd9a56911..c3be5e1bbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -190,7 +190,16 @@ public abstract class Driver implements IDriver {
private boolean isFinishedInternal() {
checkLockHeld("Lock must be held to call isFinishedInternal");
- boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root.isFinished();
+ boolean finished;
+ try {
+ finished =
+ state.get() != State.ALIVE
+ || driverContext.isDone()
+ || root.isFinished()
+ || sink.isClosed();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
if (finished) {
state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
}
@@ -219,7 +228,7 @@ public abstract class Driver implements IDriver {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
- throw t;
+ throw new RuntimeException(t);
}
// Driver thread was interrupted which should only happen if the task is already finished.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 39cf36a5c0..501781b408 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -54,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -96,22 +98,16 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
"[ProcessGetTsBlockRequest] sequence ID in [{}, {})",
req.getStartSequenceId(),
req.getEndSequenceId());
- if (!shuffleSinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
- throw new TException(
- "Source fragment instance not found. Fragment instance ID: "
- + req.getSourceFragmentInstanceId()
- + ".");
+ TGetDataBlockResponse resp = new TGetDataBlockResponse(new ArrayList<>());
+ ISinkHandle sinkHandle = shuffleSinkHandles.get(req.getSourceFragmentInstanceId());
+ if (sinkHandle == null) {
+ return resp;
}
- TGetDataBlockResponse resp = new TGetDataBlockResponse();
// index of the channel must be a SinkChannel
- SinkChannel sinkChannelHandle =
- (SinkChannel)
- (shuffleSinkHandles
- .get(req.getSourceFragmentInstanceId())
- .getChannel(req.getIndex()));
+ SinkChannel sinkChannel = (SinkChannel) (sinkHandle.getChannel(req.getIndex()));
for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
try {
- ByteBuffer serializedTsBlock = sinkChannelHandle.getSerializedTsBlock(i);
+ ByteBuffer serializedTsBlock = sinkChannel.getSerializedTsBlock(i);
resp.addToTsBlocks(serializedTsBlock);
} catch (IllegalStateException | IOException e) {
throw new TException(e);
@@ -140,15 +136,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
e.getStartSequenceId(),
e.getEndSequenceId(),
e.getSourceFragmentInstanceId());
- if (!shuffleSinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
+ ISinkHandle sinkHandle = shuffleSinkHandles.get(e.getSourceFragmentInstanceId());
+ if (sinkHandle == null) {
LOGGER.debug(
"received ACK event but target FragmentInstance[{}] is not found.",
e.getSourceFragmentInstanceId());
return;
}
// index of the channel must be a SinkChannel
- ((SinkChannel)
- (shuffleSinkHandles.get(e.getSourceFragmentInstanceId()).getChannel(e.getIndex())))
+ ((SinkChannel) (sinkHandle.getChannel(e.getIndex())))
.acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
} catch (Throwable t) {
LOGGER.warn(
@@ -162,6 +158,36 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
}
+ @Override
+ public void onCloseSinkChannelEvent(TCloseSinkChannelEvent e) throws TException {
+ try (SetThreadName fragmentInstanceName =
+ new SetThreadName(
+ createFullId(
+ e.sourceFragmentInstanceId.queryId,
+ e.sourceFragmentInstanceId.fragmentId,
+ e.sourceFragmentInstanceId.instanceId))) {
+ LOGGER.debug(
+ "Closed source handle of ShuffleSinkHandle {}, channel index: {}.",
+ e.getSourceFragmentInstanceId(),
+ e.getIndex());
+ ISinkHandle sinkHandle = shuffleSinkHandles.get(e.getSourceFragmentInstanceId());
+ if (sinkHandle == null) {
+ LOGGER.debug(
+ "received CloseSinkChannelEvent but target FragmentInstance[{}] is not found.",
+ e.getSourceFragmentInstanceId());
+ return;
+ }
+ sinkHandle.getChannel(e.getIndex()).close();
+ } catch (Throwable t) {
+ LOGGER.warn(
+ "Close channel of ShuffleSinkHandle {}, index {} failed.",
+ e.getSourceFragmentInstanceId(),
+ e.getIndex(),
+ t);
+ throw t;
+ }
+ }
+
@Override
public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
long startTime = System.nanoTime();
@@ -465,9 +491,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
if (localSourceHandle != null) {
LOGGER.debug("Get SharedTsBlockQueue from local source handle");
- queue =
- ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
- .getSharedTsBlockQueue();
+ queue = localSourceHandle.getSharedTsBlockQueue();
} else {
LOGGER.debug("Create SharedTsBlockQueue");
queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, localMemoryManager);
@@ -623,11 +647,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
localPlanNodeId,
localFragmentInstanceId);
SharedTsBlockQueue queue;
- if (shuffleSinkHandles.containsKey(remoteFragmentInstanceId)) {
+ ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId);
+ if (sinkHandle != null) {
LOGGER.debug("Get SharedTsBlockQueue from local sink handle");
- queue =
- ((LocalSinkChannel) shuffleSinkHandles.get(remoteFragmentInstanceId).getChannel(index))
- .getSharedTsBlockQueue();
+ queue = ((LocalSinkChannel) (sinkHandle.getChannel(index))).getSharedTsBlockQueue();
} else {
LOGGER.debug("Create SharedTsBlockQueue");
queue =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b62fffc8c6..53d668cc86 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -131,6 +131,10 @@ public class SharedTsBlockQueue {
return queue.isEmpty();
}
+ public boolean isClosed() {
+ return closed;
+ }
+
public int getNumOfBufferedTsBlocks() {
return queue.size();
}
@@ -147,7 +151,7 @@ public class SharedTsBlockQueue {
public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
LOGGER.debug("[SignalNoMoreTsBlockOnQueue]");
if (closed) {
- LOGGER.warn("queue has been destroyed");
+ LOGGER.debug("The queue has been destroyed when calling setNoMoreTsBlocks.");
return;
}
this.noMoreTsBlocks = noMoreTsBlocks;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java
index 5af20abf60..961f49fb70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.exchange.sink;
+import org.apache.iotdb.db.mpp.execution.driver.Driver;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -78,6 +79,9 @@ public interface ISink {
*/
void close();
+ /** Return true if this ISink has been closed. Used in {@link Driver#isFinishedInternal()} */
+ boolean isClosed();
+
/** Set max bytes this ISink can reserve from memory pool */
void setMaxBytesCanReserve(long maxBytesCanReserve);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
index c600f60eb6..5fb511e402 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
@@ -33,4 +33,7 @@ public interface ISinkHandle extends ISink {
/** Open specified channel of ISinkHandle. */
void tryOpenChannel(int channelIndex);
+
+ /** Return true if the specified channel is closed. */
+ boolean isChannelClosed(int index);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index 29ff8b7a12..afd043c9af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -225,5 +225,12 @@ public class LocalSinkChannel implements ISinkChannel {
}
}
+ @Override
+ public boolean isClosed() {
+ synchronized (queue) {
+ return queue.isClosed();
+ }
+ }
+
// end region
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 12a438d845..7060541acc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -26,12 +26,14 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Set;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
@@ -46,6 +48,8 @@ public class ShuffleSinkHandle implements ISinkHandle {
private final boolean[] channelOpened;
+ private final Set<Integer> closedChannel = Sets.newConcurrentHashSet();
+
private final DownStreamChannelIndex downStreamChannelIndex;
private final int channelNum;
@@ -141,6 +145,11 @@ public class ShuffleSinkHandle implements ISinkHandle {
}
}
+ @Override
+ public boolean isClosed() {
+ return closedChannel.size() == downStreamChannelList.size();
+ }
+
@Override
public synchronized boolean isAborted() {
return aborted;
@@ -235,6 +244,19 @@ public class ShuffleSinkHandle implements ISinkHandle {
}
}
+ @Override
+ public boolean isChannelClosed(int index) {
+ if (closedChannel.contains(index)) {
+ return true;
+ } else {
+ if (downStreamChannelList.get(index).isClosed()) {
+ closedChannel.add(index);
+ return true;
+ }
+ return false;
+ }
+ }
+
// region ============ Shuffle Related ============
public enum ShuffleStrategyEnum {
PLAIN,
@@ -281,7 +303,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
private boolean satisfy(int channelIndex) {
// downStreamChannel is always an ISinkChannel
ISinkChannel channel = downStreamChannelList.get(channelIndex);
- if (channel.isNoMoreTsBlocks()) {
+ if (channel.isNoMoreTsBlocks() || channel.isClosed()) {
return false;
}
return channel.getBufferRetainedSizeInBytes() <= channelMemoryThreshold
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index fbcdaf2e0c..5cd28de462 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -157,6 +157,10 @@ public class SinkChannel implements ISinkChannel {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
+ if (closed) {
+ // SinkChannel may have been closed by its downstream SourceHandle
+ return;
+ }
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
@@ -254,6 +258,11 @@ public class SinkChannel implements ISinkChannel {
LOGGER.debug("[EndCloseSinkChannel]");
}
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
@Override
public synchronized boolean isAborted() {
return aborted;
@@ -353,8 +362,6 @@ public class SinkChannel implements ISinkChannel {
private void checkState() {
if (aborted) {
throw new IllegalStateException("SinkChannel is aborted.");
- } else if (closed) {
- throw new IllegalStateException("SinkChannel is closed.");
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index db462721d0..b66dd4bce4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
@@ -373,6 +374,7 @@ public class SourceHandle implements ISourceHandle {
.clearMemoryReservationMap(
localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
closed = true;
+ executorService.submit(new SendCloseSinkChannelEventTask());
currSequenceId = lastSequenceId + 1;
sourceHandleListener.onFinished(this);
}
@@ -490,8 +492,20 @@ public class SourceHandle implements ISourceHandle {
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
TGetDataBlockResponse resp = client.getDataBlock(req);
-
int tsBlockNum = resp.getTsBlocks().size();
+ if (tsBlockNum == 0) {
+ if (!closed) {
+ // failed to pull TsBlocks
+ LOGGER.warn(
+ "{} failed to pull TsBlocks [{}] to [{}] from SinkHandle {}, channel index {},",
+ localFragmentInstanceId,
+ startSequenceId,
+ endSequenceId,
+ remoteFragmentInstanceId,
+ indexOfUpstreamSinkHandle);
+ }
+ return;
+ }
List<ByteBuffer> tsBlocks = new ArrayList<>(tsBlockNum);
tsBlocks.addAll(resp.getTsBlocks());
@@ -620,4 +634,47 @@ public class SourceHandle implements ISourceHandle {
}
}
}
+
+ class SendCloseSinkChannelEventTask implements Runnable {
+
+ @Override
+ public void run() {
+ try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+ LOGGER.debug(
+ "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
+ remoteFragmentInstanceId,
+ indexOfUpstreamSinkHandle);
+ int attempt = 0;
+ TCloseSinkChannelEvent closeSinkChannelEvent =
+ new TCloseSinkChannelEvent(remoteFragmentInstanceId, indexOfUpstreamSinkHandle);
+ while (attempt < MAX_ATTEMPT_TIMES) {
+ attempt += 1;
+ long startTime = System.nanoTime();
+ try (SyncDataNodeMPPDataExchangeServiceClient client =
+ mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
+ client.onCloseSinkChannelEvent(closeSinkChannelEvent);
+ break;
+ } catch (Throwable e) {
+ LOGGER.warn(
+ "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
+ remoteFragmentInstanceId,
+ indexOfUpstreamSinkHandle);
+ if (attempt == MAX_ATTEMPT_TIMES) {
+ synchronized (SourceHandle.this) {
+ sourceHandleListener.onFailure(SourceHandle.this, e);
+ }
+ }
+ try {
+ Thread.sleep(retryIntervalInMs);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ synchronized (SourceHandle.this) {
+ sourceHandleListener.onFailure(SourceHandle.this, e);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index aaf404d467..d762220da3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -38,7 +38,7 @@ public interface Operator extends AutoCloseable {
return NOT_BLOCKED;
}
- default TsBlock nextWithTimer() {
+ default TsBlock nextWithTimer() throws Exception {
OperatorContext context = getOperatorContext();
long startTime = System.nanoTime();
@@ -51,9 +51,9 @@ public interface Operator extends AutoCloseable {
}
/** Gets next tsBlock from this operator. If no data is currently available, return null. */
- TsBlock next();
+ TsBlock next() throws Exception;
- default boolean hasNextWithTimer() {
+ default boolean hasNextWithTimer() throws Exception {
OperatorContext context = getOperatorContext();
long startTime = System.nanoTime();
@@ -65,7 +65,7 @@ public interface Operator extends AutoCloseable {
}
/** @return true if the operator has more data, otherwise false */
- boolean hasNext();
+ boolean hasNext() throws Exception;
/** This method will always be called before releasing the Operator reference. */
@Override
@@ -74,7 +74,7 @@ public interface Operator extends AutoCloseable {
/**
* Is this operator completely finished processing and no more output TsBlock will be produced.
*/
- boolean isFinished();
+ boolean isFinished() throws Exception;
/**
* We should also consider the memory used by its children operator, so the calculation logic may
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
index 1ae0fbf801..7c33664327 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
@@ -82,7 +82,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
* @return true if results of all children are ready. Return false if some children is blocked or
* return null.
*/
- protected boolean prepareInput() {
+ protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
if (!isEmpty(i)) {
@@ -120,7 +120,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
}
}
- protected TsBlock getNextTsBlock(int childIndex) {
+ protected TsBlock getNextTsBlock(int childIndex) throws Exception {
return children.get(childIndex).nextWithTimer();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 0b5c189c76..3d65c1659c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -131,19 +131,18 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return !finished;
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
checkLastWriteOperation();
if (!processTsBlock(cachedTsBlock)) {
return null;
}
cachedTsBlock = null;
-
if (child.hasNextWithTimer()) {
TsBlock inputTsBlock = child.nextWithTimer();
processTsBlock(inputTsBlock);
@@ -293,7 +292,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return finished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 364086c4fb..0102c0be07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -94,12 +94,12 @@ public class AggregationOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// start stopwatch
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
@@ -135,7 +135,7 @@ public class AggregationOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !this.hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 888db23336..cb7630dea2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -113,7 +113,7 @@ public class DeviceMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// get new input TsBlock
for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNextWithTimer()) {
@@ -197,7 +197,7 @@ public class DeviceMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (finished) {
return false;
}
@@ -224,7 +224,7 @@ public class DeviceMergeOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
if (finished) {
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 242586a78e..c0af80f7fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
import java.util.List;
@@ -107,11 +108,16 @@ public class DeviceViewOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!getCurDeviceOperator().hasNextWithTimer()) {
+ // close finished child
+ getCurDeviceOperator().close();
+ deviceOperators.set(deviceIndex, null);
+ // increment index, move to next child
deviceIndex++;
return null;
}
+
TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
if (tsBlock == null) {
return null;
@@ -138,19 +144,20 @@ public class DeviceViewOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return deviceIndex < devices.size();
}
@Override
public void close() throws Exception {
- for (Operator child : deviceOperators) {
- child.close();
+ for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
+ Validate.notNull(deviceOperators.get(i));
+ deviceOperators.get(i).close();
}
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !this.hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 92ec771f32..c88ab34483 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -57,7 +57,7 @@ public class FillOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
@@ -78,7 +78,7 @@ public class FillOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return child.hasNextWithTimer();
}
@@ -88,7 +88,7 @@ public class FillOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return child.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index c08fcfd27d..0016c7255c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -94,7 +94,7 @@ public class FilterAndProjectOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock input = inputOperator.nextWithTimer();
if (input == null) {
return null;
@@ -192,12 +192,12 @@ public class FilterAndProjectOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return inputOperator.hasNextWithTimer();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return inputOperator.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 68604cd1ce..b208e551b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -51,7 +51,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
@@ -67,7 +67,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return remainingLimit > 0 && child.hasNextWithTimer();
}
@@ -77,7 +77,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return remainingLimit == 0 || child.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 3792829196..2d785311a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -82,7 +82,7 @@ public class LinearFillOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// make sure we call child.next() at most once
if (cachedTsBlock.isEmpty()) {
@@ -142,7 +142,7 @@ public class LinearFillOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
// if child.hasNext() return false, it means that there is no more tsBlocks
noMoreTsBlock = !child.hasNextWithTimer();
// if there is more tsBlock, we can call child.next() once
@@ -156,7 +156,7 @@ public class LinearFillOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return cachedTsBlock.isEmpty() && child.isFinished();
}
@@ -208,7 +208,7 @@ public class LinearFillOperator implements ProcessOperator {
/**
* @return true if we succeed to get next TsBlock and add it into cachedTsBlock, otherwise false
*/
- private boolean tryToGetNextTsBlock() {
+ private boolean tryToGetNextTsBlock() throws Exception {
if (canCallNext) { // if we can call child.next(), we call that and cache it in
// cachedTsBlock
canCallNext = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 58b5af4a60..0fdc3f2630 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -83,7 +83,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// start stopwatch
long startTime = System.nanoTime();
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
@@ -144,7 +144,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (finished) {
return false;
}
@@ -164,7 +164,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
if (finished) {
return true;
}
@@ -217,7 +217,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
* some children is blocked or return null.
*/
@Override
- protected boolean prepareInput() {
+ protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
if (noMoreTsBlocks[i] || !isEmpty(i)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index e6f146eac1..37127e82aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -51,7 +51,7 @@ public class OffsetOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
@@ -66,7 +66,7 @@ public class OffsetOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return child.hasNextWithTimer();
}
@@ -76,7 +76,7 @@ public class OffsetOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return child.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 97e6f6d93c..ed765d9b47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -73,19 +73,19 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
this.resultTsBlockBuilder = windowManager.createResultTsBlockBuilder(aggregators);
}
- private boolean hasMoreData() {
+ private boolean hasMoreData() throws Exception {
return !(inputTsBlock == null || inputTsBlock.isEmpty())
|| child.hasNextWithTimer()
|| hasCachedDataInAggregator;
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return windowManager.hasNext(hasMoreData());
}
@Override
- protected boolean calculateNextAggregationResult() {
+ protected boolean calculateNextAggregationResult() throws Exception {
// if needSkip is true, just get the tsBlock directly.
while (needSkip || !calculateFromRawData()) {
@@ -107,7 +107,8 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
if (windowManager.notInitializedLastTimeWindow()) {
initWindowAndAggregators();
}
- // If the window is not initialized, it just returns to avoid invoking updateResultTsBlock()
+ // If the window is not initialized, it just returns to avoid invoking
+ // updateResultTsBlock()
// but if it's skipping the last window, just break and keep skipping.
if (needSkip || windowManager.isCurWindowInit()) break;
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
index a22d52c9e2..51464b742c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
@@ -81,7 +81,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock tsBlock = deviceOperator.nextWithTimer();
if (tsBlock == null) {
return null;
@@ -103,7 +103,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return deviceOperator.hasNextWithTimer();
}
@@ -113,7 +113,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !this.hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 0fb561e1e3..40938e335e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -72,7 +72,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// start stopwatch
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
@@ -97,7 +97,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !this.hasNextWithTimer();
}
@@ -106,7 +106,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
child.close();
}
- protected abstract boolean calculateNextAggregationResult();
+ protected abstract boolean calculateNextAggregationResult() throws Exception;
protected abstract void updateResultTsBlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 3fb0dca85a..3e8871da00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -71,12 +71,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
}
@Override
- protected boolean calculateNextAggregationResult() {
+ protected boolean calculateNextAggregationResult() throws Exception {
if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
// move to next time window
curTimeRange = timeRangeIterator.nextTimeRange();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 16c190a25e..ae61601063 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -64,7 +64,7 @@ public class SortOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock tsBlock = inputOperator.nextWithTimer();
if (tsBlock == null) {
return null;
@@ -103,7 +103,7 @@ public class SortOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return inputOperator.hasNextWithTimer();
}
@@ -113,7 +113,7 @@ public class SortOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return cachedData == null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 9cc3e37a63..0360028c2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -81,7 +81,7 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull()) {
@@ -145,12 +145,12 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return !isEmpty(readyChildIndex) || children.get(readyChildIndex).hasNextWithTimer();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !this.hasNextWithTimer();
}
@@ -176,7 +176,7 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator {
}
@Override
- protected TsBlock getNextTsBlock(int childIndex) {
+ protected TsBlock getNextTsBlock(int childIndex) throws Exception {
consumedIndices[childIndex] = 0;
return children.get(childIndex).nextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 9f0a0113f5..751c121911 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -140,8 +140,7 @@ public class TransformOperator implements ProcessOperator {
}
}
- protected YieldableState iterateAllColumnsToNextValid()
- throws QueryProcessException, IOException {
+ protected YieldableState iterateAllColumnsToNextValid() throws Exception {
for (int i = 0, n = shouldIterateReadersToNextValid.length; i < n; ++i) {
if (shouldIterateReadersToNextValid[i]) {
final YieldableState yieldableState = iterateReaderToNextValid(transformers[i]);
@@ -154,8 +153,7 @@ public class TransformOperator implements ProcessOperator {
return YieldableState.YIELDABLE;
}
- protected YieldableState iterateReaderToNextValid(LayerPointReader reader)
- throws QueryProcessException, IOException {
+ protected YieldableState iterateReaderToNextValid(LayerPointReader reader) throws Exception {
// Since a constant operand is not allowed to be a result column, the reader will not be
// a ConstantLayerPointReader.
// If keepNull is false, we must iterate the reader until a non-null row is returned.
@@ -172,7 +170,7 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public final boolean hasNext() {
+ public final boolean hasNext() throws Exception {
if (!timeHeap.isEmpty()) {
return true;
}
@@ -188,7 +186,7 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
try {
YieldableState yieldableState = iterateAllColumnsToNextValid();
@@ -255,7 +253,7 @@ public class TransformOperator implements ProcessOperator {
}
protected boolean collectReaderAppendIsNull(LayerPointReader reader, long currentTime)
- throws QueryProcessException, IOException {
+ throws Exception {
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
@@ -275,7 +273,7 @@ public class TransformOperator implements ProcessOperator {
protected YieldableState collectDataPoint(
LayerPointReader reader, ColumnBuilder writer, long currentTime, int readerIndex)
- throws QueryProcessException, IOException {
+ throws Exception {
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
writer.appendNull();
@@ -336,7 +334,7 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
// call hasNext first, or data of inputOperator could be missing
boolean flag = !hasNextWithTimer();
return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index dadd3760af..f26fb7dfb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -63,7 +63,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!prepareInput()) {
return null;
}
@@ -106,7 +106,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (finished) {
return false;
}
@@ -121,7 +121,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
if (finished) {
return true;
}
@@ -173,7 +173,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
}
@Override
- protected TsBlock getNextTsBlock(int childIndex) {
+ protected TsBlock getNextTsBlock(int childIndex) throws Exception {
inputIndex[childIndex] = 0;
return children.get(childIndex).nextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index aca522cc57..0cc36c5e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -120,7 +120,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}
@@ -184,7 +184,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (finished) {
return false;
}
@@ -207,7 +207,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
if (finished) {
return true;
}
@@ -269,7 +269,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
* some children is blocked or return null.
*/
@Override
- protected boolean prepareInput() {
+ protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
if (noMoreTsBlocks[i] || !isEmpty(i)) {
@@ -288,6 +288,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
noMoreTsBlocks[i] = true;
inputTsBlocks[i] = null;
}
+
} else {
allReady = false;
}
@@ -301,7 +302,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
}
@Override
- protected TsBlock getNextTsBlock(int childIndex) {
+ protected TsBlock getNextTsBlock(int childIndex) throws Exception {
inputIndex[childIndex] = 0;
return children.get(childIndex).nextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index f945b700e4..2b60ad6d3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -119,7 +119,7 @@ public class TimeJoinOperator extends AbstractOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}
@@ -147,7 +147,8 @@ public class TimeJoinOperator extends AbstractOperator {
// In such case, TimeJoinOperator can't go on calculating, so we just return null.
// We can also use the while loop here to continuously call the hasNext() and next()
// methods of the child operator until its hasNext() returns false or the next() gets
- // the data that is not empty, but this will cause the execution time of the while loop
+ // the data that is not empty, but this will cause the execution time of the while
+ // loop
// to be uncontrollable and may exceed all allocated time slice
return null;
}
@@ -198,7 +199,7 @@ public class TimeJoinOperator extends AbstractOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (finished) {
return false;
}
@@ -228,7 +229,7 @@ public class TimeJoinOperator extends AbstractOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
if (finished) {
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 60095e9365..f0b1ffc49a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -81,12 +81,12 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return child.hasNextWithTimer();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return child.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index f78c19e4fe..a75581eb03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -50,7 +50,7 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock res = child.nextWithTimer();
if (res == null) {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index b69ecfa388..199d12b467 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -60,7 +60,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (children.get(currentIndex).hasNextWithTimer()) {
return children.get(currentIndex).nextWithTimer();
} else {
@@ -70,7 +70,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return currentIndex < inputOperatorsCount;
}
@@ -82,7 +82,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index fcc074b024..af4517d246 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -104,7 +104,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// end time series for returned TsBlock this time, it's the min/max end time series among all
// the children
@@ -133,10 +133,12 @@ public class LastQueryMergeOperator implements ProcessOperator {
} else {
// child operator has next but return an empty TsBlock which means that it may not
// finish calculation in given time slice.
- // In such case, LastQueryMergeOperator can't go on calculating, so we just return null.
+ // In such case, LastQueryMergeOperator can't go on calculating, so we just return
+ // null.
// We can also use the while loop here to continuously call the hasNext() and next()
// methods of the child operator until its hasNext() returns false or the next() gets
- // the data that is not empty, but this will cause the execution time of the while loop
+ // the data that is not empty, but this will cause the execution time of the while
+ // loop
// to be uncontrollable and may exceed all allocated time slice
return null;
}
@@ -179,7 +181,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (finished) {
return false;
}
@@ -207,7 +209,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
if (finished) {
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index c0f825e02a..db4a7e95dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -85,7 +85,7 @@ public class LastQueryOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// we have consumed up data from children Operator, just return all remaining cached data in
// tsBlockBuilder
@@ -112,6 +112,7 @@ public class LastQueryOperator implements ProcessOperator {
LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
}
}
+
currentIndex++;
}
@@ -121,12 +122,12 @@ public class LastQueryOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return currentIndex < inputOperatorsCount || !tsBlockBuilder.isEmpty();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 09779cf106..5a4fbd7c0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -110,7 +110,7 @@ public class LastQuerySortOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// we have consumed up data from children Operator, just return all remaining cached data in
// cachedTsBlock, tsBlockBuilder and previousTsBlock
if (currentIndex >= inputOperatorsCount) {
@@ -152,6 +152,7 @@ public class LastQuerySortOperator implements ProcessOperator {
return null;
}
}
+
currentIndex++;
}
if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
@@ -169,7 +170,7 @@ public class LastQuerySortOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return currentIndex < inputOperatorsCount
|| cachedTsBlockRowIndex < cachedTsBlockSize
|| !tsBlockBuilder.isEmpty()
@@ -185,7 +186,7 @@ public class LastQuerySortOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 69972a40e2..7bcc5a61e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -52,7 +52,7 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock res = child.nextWithTimer();
if (res == null) {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
index c70264a549..a1ec71f1a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
@@ -85,7 +85,7 @@ public class CountGroupByLevelMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -146,12 +146,12 @@ public class CountGroupByLevelMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
index 0e3746e7b3..f852492778 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
@@ -80,7 +80,7 @@ public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements Sou
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -88,7 +88,7 @@ public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements Sou
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (schemaReader == null) {
schemaReader = createTimeSeriesReader();
}
@@ -140,7 +140,7 @@ public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements Sou
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 5042ab3460..ba62cb6d01 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -76,10 +76,11 @@ public class CountMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
+
if (resultTsBlockList != null) {
currentIndex++;
return resultTsBlockList.get(currentIndex - 1);
@@ -122,12 +123,12 @@ public class CountMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index 7124b6d27c..19e89ea1f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -74,7 +74,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (isReadingMemory) {
isReadingMemory = false;
return transferToTsBlock(data);
@@ -125,7 +125,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return isReadingMemory || child.hasNextWithTimer();
}
@@ -135,7 +135,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !isReadingMemory && child.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 4d04a62e86..10b1d2bf30 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -69,7 +69,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock block = child.nextWithTimer();
if (block == null || block.isEmpty()) {
return null;
@@ -94,7 +94,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return child.hasNextWithTimer();
}
@@ -104,7 +104,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return child.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 3a662afe13..a68cc9b6b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -64,7 +64,7 @@ public class NodePathsCountOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
while (!child.isFinished()) {
// read as much child result as possible
ListenableFuture<?> blocked = child.isBlocked();
@@ -91,7 +91,7 @@ public class NodePathsCountOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return !child.isFinished() || !isFinished;
}
@@ -106,7 +106,7 @@ public class NodePathsCountOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return isFinished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
index 5e313e9fea..d91caee311 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
@@ -69,7 +69,7 @@ public class SchemaCountOperator<T extends ISchemaInfo> implements SourceOperato
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -94,12 +94,12 @@ public class SchemaCountOperator<T extends ISchemaInfo> implements SourceOperato
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return !isFinished;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return isFinished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 977cd388df..6be4893d4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -66,7 +66,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (isReadingStorageGroupInfo) {
isReadingStorageGroupInfo = false;
return generateStorageGroupInfo();
@@ -81,7 +81,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return isReadingStorageGroupInfo || currentIndex < childrenCount;
}
@@ -93,7 +93,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 0f406e1385..79ee0c381e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -81,7 +81,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -95,12 +95,12 @@ public class SchemaFetchScanOperator implements SourceOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return !isFinished;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return isFinished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index 0ad36268a5..58ce3b2d2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -50,7 +50,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (children.get(currentIndex).hasNextWithTimer()) {
return children.get(currentIndex).nextWithTimer();
} else {
@@ -60,7 +60,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return currentIndex < children.size();
}
@@ -70,7 +70,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 5fc7a1b681..64ba3314e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -70,7 +70,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -187,7 +187,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
}
@@ -199,7 +199,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index ee27182135..2c261dbb2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -126,7 +126,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -146,7 +146,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (schemaReader == null) {
schemaReader = createSchemaReader();
}
@@ -154,7 +154,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
index 01ecb7f71b..e4b44508d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
import java.util.List;
@@ -54,13 +55,22 @@ public class IdentitySinkOperator implements Operator {
}
@Override
- public boolean hasNext() {
- if (children.get(downStreamChannelIndex.getCurrentIndex()).hasNext()) {
+ public boolean hasNext() throws Exception {
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ boolean currentChannelClosed = sinkHandle.isChannelClosed(currentIndex);
+ if (!currentChannelClosed && children.get(currentIndex).hasNext()) {
return true;
+ } else if (currentChannelClosed) {
+ // we close the child directly. The child could be an ExchangeOperator which is the downstream
+ // of an ISinkChannel of a pipeline driver.
+ closeCurrentChild(currentIndex);
+ } else {
+ // current child has no more data
+ closeCurrentChild(currentIndex);
+ sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
}
- int currentIndex = downStreamChannelIndex.getCurrentIndex();
- // current channel have no more data
- sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
+
+ // increment the index
currentIndex++;
if (currentIndex >= children.size()) {
isFinished = true;
@@ -76,8 +86,13 @@ public class IdentitySinkOperator implements Operator {
return true;
}
+ private void closeCurrentChild(int index) throws Exception {
+ children.get(index).close();
+ children.set(index, null);
+ }
+
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (needToReturnNull) {
needToReturnNull = false;
return null;
@@ -91,7 +106,7 @@ public class IdentitySinkOperator implements Operator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return isFinished;
}
@@ -102,8 +117,9 @@ public class IdentitySinkOperator implements Operator {
@Override
public void close() throws Exception {
- for (Operator child : children) {
- child.close();
+ for (int i = downStreamChannelIndex.getCurrentIndex(), n = children.size(); i < n; i++) {
+ Validate.notNull(children.get(i));
+ children.get(i).close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
index 2e7863eb0f..1a8790b661 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
@@ -59,13 +59,20 @@ public class ShuffleHelperOperator implements Operator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
int currentIndex = downStreamChannelIndex.getCurrentIndex();
- if (children.get(currentIndex).hasNext()) {
+ boolean currentChannelClosed = sinkHandle.isChannelClosed(currentIndex);
+ if (!currentChannelClosed && children.get(currentIndex).hasNext()) {
return true;
+ } else if (currentChannelClosed) {
+ // we close the child directly. The child could be an ExchangeOperator which is the downstream
+ // of an ISinkChannel of a pipeline driver.
+ closeCurrentChild(currentIndex);
+ } else {
+ // current channel has no more data
+ closeCurrentChild(currentIndex);
+ sinkHandle.setNoMoreTsBlocksOfOneChannel(currentIndex);
}
- // current channel have no more data
- sinkHandle.setNoMoreTsBlocksOfOneChannel(currentIndex);
unfinishedChildren.remove(currentIndex);
currentIndex = (currentIndex + 1) % children.size();
downStreamChannelIndex.setCurrentIndex(currentIndex);
@@ -78,8 +85,13 @@ public class ShuffleHelperOperator implements Operator {
return true;
}
+ private void closeCurrentChild(int index) throws Exception {
+ children.get(index).close();
+ children.set(index, null);
+ }
+
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (needToReturnNull) {
needToReturnNull = false;
return null;
@@ -89,11 +101,20 @@ public class ShuffleHelperOperator implements Operator {
@Override
public ListenableFuture<?> isBlocked() {
- return children.get(downStreamChannelIndex.getCurrentIndex()).isBlocked();
+ int steps = 0;
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ // skip closed children
+ while (children.get(currentIndex) == null && steps < children.size()) {
+ currentIndex = (currentIndex + 1) % children.size();
+ steps++;
+ }
+ downStreamChannelIndex.setCurrentIndex(currentIndex);
+ Operator child = children.get(currentIndex);
+ return child == null ? NOT_BLOCKED : child.isBlocked();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return unfinishedChildren.isEmpty();
}
@@ -105,7 +126,9 @@ public class ShuffleHelperOperator implements Operator {
@Override
public void close() throws Exception {
for (Operator child : children) {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index afb2957f93..67b42a4319 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -113,12 +113,12 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return timeRangeIterator.hasNextTimeRange();
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
// start stopwatch
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
@@ -148,7 +148,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return finished || (finished = !hasNextWithTimer());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 47830d42ed..ebc407ea97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -62,7 +62,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}
@@ -72,7 +72,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (retainedTsBlock != null) {
return true;
}
@@ -117,7 +117,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return finished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 8699c3e135..27ef9a3022 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -69,17 +69,17 @@ public class ExchangeOperator implements SourceOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
return sourceHandle.receive();
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return !sourceHandle.isFinished();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return sourceHandle.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index 6c5185fbf2..eb975d3469 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -41,19 +41,19 @@ public class LastCacheScanOperator implements SourceOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock res = tsBlock;
tsBlock = null;
return res;
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return tsBlock != null && !tsBlock.isEmpty();
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNextWithTimer();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 4032dea5e2..2a315920f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -129,7 +129,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}
@@ -139,7 +139,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (retainedTsBlock != null) {
return true;
}
@@ -184,7 +184,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return finished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
index 0a9c86d0c2..ef930f8400 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
@@ -60,7 +60,7 @@ public class ShowQueriesOperator implements SourceOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlock res = tsBlock;
hasConsumed = true;
tsBlock = null;
@@ -68,7 +68,7 @@ public class ShowQueriesOperator implements SourceOperator {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
if (hasConsumed) {
return false;
}
@@ -79,7 +79,7 @@ public class ShowQueriesOperator implements SourceOperator {
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return hasConsumed;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java
index 263254a5f3..f61dd44114 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java
@@ -19,11 +19,7 @@
package org.apache.iotdb.db.mpp.transformation.api;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-
-import java.io.IOException;
-
public interface YieldableReader {
- YieldableState yield() throws IOException, QueryProcessException;
+ YieldableState yield() throws Exception;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java
index 4f640e384c..db1272cffd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java
@@ -35,7 +35,7 @@ public interface IUDFInputDataSet {
boolean hasNextRowInObjects() throws IOException;
/** Whether the data set has next row. */
- default YieldableState canYieldNextRowInObjects() throws IOException {
+ default YieldableState canYieldNextRowInObjects() throws Exception {
return hasNextRowInObjects()
? YieldableState.YIELDABLE
: YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
index 1aadf50b09..7825b2f423 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
@@ -113,7 +113,7 @@ public class QueryDataSetInputLayer {
}
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCachedRowRecord) {
return YieldableState.YIELDABLE;
}
@@ -231,7 +231,7 @@ public class QueryDataSetInputLayer {
private class TimePointReader extends AbstractLayerPointReader {
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCachedRowRecord) {
return YieldableState.YIELDABLE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
index 8f6629aba3..9bb3593441 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
@@ -50,7 +50,7 @@ public class TsBlockInputDataSet implements IUDFInputDataSet {
}
@Override
- public YieldableState canYieldNextRowInObjects() {
+ public YieldableState canYieldNextRowInObjects() throws Exception {
if (tsBlockRowIterator == null) {
if (operator.isBlocked() != Operator.NOT_BLOCKED) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index b2c9c731f0..9c16557c5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -165,7 +165,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
}
@Override
- public YieldableState canYieldNextRowInObjects() throws IOException {
+ public YieldableState canYieldNextRowInObjects() throws Exception {
if (cachedRow != null) {
return YieldableState.YIELDABLE;
}
@@ -301,7 +301,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
private boolean currentNull = false;
@Override
- public YieldableState yield() throws IOException {
+ public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -384,7 +384,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
private int beginIndex = -slidingStep;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -517,7 +517,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
private int nextIndexBegin = 0;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (rowRecordList.size() == 0 && nextWindowTimeBegin == Long.MIN_VALUE) {
final YieldableState yieldableState =
@@ -694,7 +694,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
private int nextIndexEnd = 1;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (rowRecordList.size() == 0) {
final YieldableState yieldableState =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 165a079120..38e474aaba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -90,7 +90,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
}
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -192,7 +192,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
private int currentRowIndex = -1;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -273,7 +273,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
private int beginIndex = -slidingStep;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -402,7 +402,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
private boolean hasAtLeastOneRow;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (tvList.size() == 0) {
final YieldableState yieldableState =
@@ -583,7 +583,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
private int nextIndexEnd = 1;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (tvList.size() == 0) {
final YieldableState yieldableState =
@@ -701,7 +701,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
private ValueRecorder valueRecorder = new ValueRecorder();
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (tvList.size() == 0) {
final YieldableState yieldableState =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index d6bd270c41..0ff38822d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -78,7 +78,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
private boolean isCurrentNull = false;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (!hasCached) {
final YieldableState yieldableState = parentLayerPointReader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -151,7 +151,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
private int beginIndex = -slidingStep;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -277,7 +277,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
private int nextIndexBegin = 0;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (tvList.size() == 0) {
final YieldableState yieldableState =
@@ -457,7 +457,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
private int nextIndexEnd = 1;
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (tvList.size() == 0) {
final YieldableState yieldableState =
@@ -575,7 +575,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
private ValueRecorder valueRecorder = new ValueRecorder();
@Override
- public YieldableState yield() throws IOException, QueryProcessException {
+ public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (tvList.size() == 0) {
final YieldableState yieldableState =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
index 28fb5f9cae..d99a5c8404 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
@@ -56,7 +56,7 @@ public abstract class Transformer implements LayerPointReader {
protected abstract boolean cacheValue() throws QueryProcessException, IOException;
@Override
- public final YieldableState yield() throws IOException, QueryProcessException {
+ public final YieldableState yield() throws Exception {
if (hasCachedValue) {
return YieldableState.YIELDABLE;
}
@@ -71,7 +71,7 @@ public abstract class Transformer implements LayerPointReader {
/**
* if this method returns YieldableState.YIELDABLE, at least one of the cached field should be set
*/
- protected abstract YieldableState yieldValue() throws QueryProcessException, IOException;
+ protected abstract YieldableState yieldValue() throws Exception;
@Override
public final void readyForNext() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java
index 761c78fcc7..fbfaad56bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java
@@ -59,7 +59,7 @@ public abstract class BinaryTransformer extends Transformer {
}
@Override
- public YieldableState yieldValue() throws IOException, QueryProcessException {
+ public YieldableState yieldValue() throws Exception {
final YieldableState leftYieldableState = leftPointReader.yield();
final YieldableState rightYieldableState = rightPointReader.yield();
@@ -89,7 +89,7 @@ public abstract class BinaryTransformer extends Transformer {
return YieldableState.YIELDABLE;
}
- private YieldableState yieldTime() throws IOException, QueryProcessException {
+ private YieldableState yieldTime() throws Exception {
if (isCurrentConstant) {
return YieldableState.YIELDABLE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java
index 6440e39585..839f729cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java
@@ -43,7 +43,7 @@ public abstract class LogicBinaryTransformer extends BinaryTransformer {
}
@Override
- public YieldableState yieldValue() throws QueryProcessException, IOException {
+ public YieldableState yieldValue() throws Exception {
final YieldableState leftYieldableState = leftPointReader.yield();
final YieldableState rightYieldableState = rightPointReader.yield();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java
index c783d32294..7be1849b9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java
@@ -63,7 +63,7 @@ public class MappableUDFQueryRowTransformer extends UDFQueryTransformer {
}
@Override
- protected YieldableState yieldValue() throws QueryProcessException, IOException {
+ protected YieldableState yieldValue() throws Exception {
final YieldableState yieldableState = layerRowReader.yield();
if (!YieldableState.YIELDABLE.equals(yieldableState)) {
return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java
index 9128d9a1c6..4fc9953d28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java
@@ -36,7 +36,7 @@ public class UDFQueryRowTransformer extends UniversalUDFQueryTransformer {
}
@Override
- protected YieldableState tryExecuteUDFOnce() throws QueryProcessException, IOException {
+ protected YieldableState tryExecuteUDFOnce() throws Exception {
final YieldableState yieldableState = layerRowReader.yield();
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java
index 9776390953..fb045b710f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java
@@ -37,7 +37,7 @@ public class UDFQueryRowWindowTransformer extends UniversalUDFQueryTransformer {
}
@Override
- protected YieldableState tryExecuteUDFOnce() throws QueryProcessException, IOException {
+ protected YieldableState tryExecuteUDFOnce() throws Exception {
final YieldableState yieldableState = layerRowWindowReader.yield();
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java
index d19f0d2740..4e60a3e4b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java
@@ -39,7 +39,7 @@ public abstract class UniversalUDFQueryTransformer extends UDFQueryTransformer {
}
@Override
- protected final YieldableState yieldValue() throws QueryProcessException, IOException {
+ protected final YieldableState yieldValue() throws Exception {
while (!cacheValueFromUDFOutput()) {
final YieldableState udfYieldableState = tryExecuteUDFOnce();
if (udfYieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -62,7 +62,7 @@ public abstract class UniversalUDFQueryTransformer extends UDFQueryTransformer {
return true;
}
- protected abstract YieldableState tryExecuteUDFOnce() throws QueryProcessException, IOException;
+ protected abstract YieldableState tryExecuteUDFOnce() throws Exception;
protected abstract boolean executeUDFOnce() throws QueryProcessException, IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java
index 1e72272cfc..07c7d6911f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java
@@ -45,7 +45,7 @@ public abstract class TernaryTransformer extends Transformer {
protected final boolean isCurrentConstant;
@Override
- protected YieldableState yieldValue() throws QueryProcessException, IOException {
+ protected YieldableState yieldValue() throws Exception {
final YieldableState firstYieldableState = firstPointReader.yield();
final YieldableState secondYieldableState = secondPointReader.yield();
final YieldableState thirdYieldableState = thirdPointReader.yield();
@@ -81,7 +81,7 @@ public abstract class TernaryTransformer extends Transformer {
return YieldableState.YIELDABLE;
}
- private YieldableState yieldTime() throws IOException, QueryProcessException {
+ private YieldableState yieldTime() throws Exception {
if (isCurrentConstant) {
return YieldableState.YIELDABLE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java
index f772276566..56281a315c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java
@@ -40,7 +40,7 @@ public class IsNullTransformer extends UnaryTransformer {
}
@Override
- public final YieldableState yieldValue() throws IOException, QueryProcessException {
+ public final YieldableState yieldValue() throws Exception {
final YieldableState yieldableState = layerPointReader.yield();
if (!YieldableState.YIELDABLE.equals(yieldableState)) {
return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java
index 65d6a52776..72079689cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java
@@ -45,7 +45,7 @@ public abstract class UnaryTransformer extends Transformer {
}
@Override
- public YieldableState yieldValue() throws IOException, QueryProcessException {
+ public YieldableState yieldValue() throws Exception {
final YieldableState yieldableState = layerPointReader.yield();
if (!YieldableState.YIELDABLE.equals(yieldableState)) {
return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java
index 476b71b4c6..a237a19147 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java
@@ -47,7 +47,7 @@ public class DiffFunctionTransformer extends UnaryTransformer {
}
@Override
- public final YieldableState yieldValue() throws IOException, QueryProcessException {
+ public final YieldableState yieldValue() throws Exception {
final YieldableState yieldableState = layerPointReader.yield();
if (!YieldableState.YIELDABLE.equals(yieldableState)) {
return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java
index 791628ec7b..2725af36a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java
@@ -39,7 +39,7 @@ public class LayerCacheUtils {
LayerPointReader source,
ElasticSerializableTVList target,
int pointNumber)
- throws QueryProcessException, IOException {
+ throws Exception {
int count = 0;
while (count < pointNumber) {
final YieldableState yieldableState = yieldPoint(dataType, source, target);
@@ -53,7 +53,7 @@ public class LayerCacheUtils {
public static YieldableState yieldPoint(
TSDataType dataType, LayerPointReader source, ElasticSerializableTVList target)
- throws IOException, QueryProcessException {
+ throws Exception {
final YieldableState yieldableState = source.yield();
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
@@ -96,7 +96,7 @@ public class LayerCacheUtils {
/** @return number of actually collected, which may be less than or equals to rowsNumber */
public static YieldableState yieldRows(
IUDFInputDataSet source, ElasticSerializableRowRecordList target, int rowsNumber)
- throws QueryProcessException, IOException {
+ throws Exception {
int count = 0;
while (count < rowsNumber) {
final YieldableState yieldableState = yieldRow(source, target);
@@ -109,9 +109,9 @@ public class LayerCacheUtils {
}
public static YieldableState yieldRow(
- IUDFInputDataSet source, ElasticSerializableRowRecordList target)
- throws IOException, QueryProcessException {
+ IUDFInputDataSet source, ElasticSerializableRowRecordList target) throws Exception {
final YieldableState yieldableState = source.canYieldNextRowInObjects();
+
if (yieldableState == YieldableState.YIELDABLE) {
target.put(source.nextRowInObjects());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java
index 2ccc9bf8ee..7b28462fe6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java
@@ -73,6 +73,11 @@ public class StubSink implements ISink {
instanceContext.transitionToFlushing();
}
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
@Override
public boolean isAborted() {
return closed;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index ca119a07ee..0147d33c3a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -59,7 +59,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -97,8 +96,7 @@ public class AggregationOperatorTest {
/** Try to aggregate unary intermediate result of one time series without group by interval. */
@Test
- public void testAggregateIntermediateResult1()
- throws IllegalPathException, ExecutionException, InterruptedException {
+ public void testAggregateIntermediateResult1() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.COUNT);
aggregationTypes.add(TAggregationType.SUM);
@@ -122,7 +120,9 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = null;
+ resultTsBlock = aggregationOperator.next();
+
if (resultTsBlock == null) {
continue;
}
@@ -139,8 +139,7 @@ public class AggregationOperatorTest {
/** Try to aggregate binary intermediate result of one time series without group by interval. */
@Test
- public void testAggregateIntermediateResult2()
- throws IllegalPathException, ExecutionException, InterruptedException {
+ public void testAggregateIntermediateResult2() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.AVG);
aggregationTypes.add(TAggregationType.FIRST_VALUE);
@@ -163,7 +162,8 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = null;
+ resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -176,8 +176,7 @@ public class AggregationOperatorTest {
}
@Test
- public void testGroupByIntermediateResult1()
- throws IllegalPathException, ExecutionException, InterruptedException {
+ public void testGroupByIntermediateResult1() throws Exception {
int[][] result =
new int[][] {
{100, 100, 100, 99},
@@ -211,7 +210,8 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = null;
+ resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -231,8 +231,7 @@ public class AggregationOperatorTest {
}
@Test
- public void testGroupByIntermediateResult2()
- throws IllegalPathException, ExecutionException, InterruptedException {
+ public void testGroupByIntermediateResult2() throws Exception {
double[][] result =
new double[][] {
{20049.5, 20149.5, 6249.5, 8429.808},
@@ -262,7 +261,8 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = null;
+ resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 6d45de1d7f..f5d7e9a003 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -95,7 +95,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+ public void testAggregationWithoutTimeFilter() throws Exception {
List<Aggregator> aggregators = new ArrayList<>();
for (int i = 0; i < measurementSchemas.size(); i++) {
TSDataType dataType = measurementSchemas.get(i).getType();
@@ -126,7 +126,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
+ public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception {
List<Aggregator> aggregators = new ArrayList<>();
for (int i = 0; i < measurementSchemas.size(); i++) {
TSDataType dataType = measurementSchemas.get(i).getType();
@@ -153,11 +153,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
+ public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.COUNT);
aggregationTypes.add(TAggregationType.SUM);
@@ -190,7 +191,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
+ public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.FIRST_VALUE);
aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -231,8 +232,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
- throws IllegalPathException {
+ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.FIRST_VALUE);
aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -273,7 +273,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testAggregationWithTimeFilter1() throws IllegalPathException {
+ public void testAggregationWithTimeFilter1() throws Exception {
List<Aggregator> aggregators = new ArrayList<>();
for (int i = 0; i < measurementSchemas.size(); i++) {
TSDataType dataType = measurementSchemas.get(i).getType();
@@ -301,11 +301,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testAggregationWithTimeFilter2() throws IllegalPathException {
+ public void testAggregationWithTimeFilter2() throws Exception {
Filter timeFilter = TimeFilter.ltEq(379);
List<Aggregator> aggregators = new ArrayList<>();
for (int i = 0; i < measurementSchemas.size(); i++) {
@@ -333,11 +334,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testAggregationWithTimeFilter3() throws IllegalPathException {
+ public void testAggregationWithTimeFilter3() throws Exception {
Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
List<Aggregator> aggregators = new ArrayList<>();
for (int i = 0; i < measurementSchemas.size(); i++) {
@@ -369,7 +371,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
+ public void testMultiAggregationWithTimeFilter() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.FIRST_VALUE);
aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -407,11 +409,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
assertEquals(399, resultTsBlock.getColumn(5).getLong(0));
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
+ public void testGroupByWithoutGlobalTimeFilter() throws Exception {
int[] result = new int[] {100, 100, 100, 99};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
List<Aggregator> aggregators = new ArrayList<>();
@@ -444,11 +447,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(4, count);
}
@Test
- public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
+ public void testGroupByWithGlobalTimeFilter() throws Exception {
int[] result = new int[] {0, 80, 100, 80};
Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
@@ -487,7 +491,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testGroupByWithMultiFunction() throws IllegalPathException {
+ public void testGroupByWithMultiFunction() throws Exception {
int[][] result =
new int[][] {
{20000, 20100, 10200, 10300},
@@ -530,7 +534,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception {
int[][] result =
new int[][] {
{20000, 20100, 10200, 10300},
@@ -573,7 +577,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testGroupBySlidingTimeWindow() throws IllegalPathException {
+ public void testGroupBySlidingTimeWindow() throws Exception {
int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 49};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
@@ -603,7 +607,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
+ public void testGroupBySlidingTimeWindow2() throws Exception {
int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
@@ -634,7 +638,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
}
@Test
- public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
+ public void testGroupBySlidingWindowWithMultiFunction() throws Exception {
int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
int[][] result =
new int[][] {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
index 894ef7c8c3..5366412433 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -96,7 +96,7 @@ public class AlignedSeriesScanOperatorTest {
}
@Test
- public void batchTest1() {
+ public void batchTest1() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -173,7 +173,7 @@ public class AlignedSeriesScanOperatorTest {
}
@Test
- public void batchTest2() {
+ public void batchTest2() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -462,7 +462,7 @@ public class AlignedSeriesScanOperatorTest {
/** order by time desc */
@Test
- public void batchTest3() {
+ public void batchTest3() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
deleted file mode 100644
index 9960a40286..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.operator;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class DeviceMergeOperatorTest {
-
- private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
- private final List<String> deviceIds = new ArrayList<>();
- private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-
- private final List<TsFileResource> seqResources = new ArrayList<>();
- private final List<TsFileResource> unSeqResources = new ArrayList<>();
-
- @Before
- public void setUp() throws MetadataException, IOException, WriteProcessException {
- SeriesReaderTestUtil.setUp(
- measurementSchemas, deviceIds, seqResources, unSeqResources, DEVICE_MERGE_OPERATOR_TEST_SG);
- }
-
- @After
- public void tearDown() throws IOException {
- SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
- }
-
- /**
- * Construct DeviceMergeOperator with different devices in two DeviceViewOperators.
- *
- * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
- *
- * <p>DeviceViewOperator2: [seriesScanOperator: [device1.sensor1]]
- *
- * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
- * and the sensor0 column of device1 should be null.
- */
- @Test
- public void deviceMergeOperatorTest() {
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
- try {
- // Construct operator tree
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
- PlanNodeId planNodeId1 = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId2 = new PlanNodeId("2");
- driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId3 = new PlanNodeId("3");
- driverContext.addOperatorContext(
- 3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
- PlanNodeId planNodeId4 = new PlanNodeId("4");
- driverContext.addOperatorContext(
- 4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
- PlanNodeId planNodeId5 = new PlanNodeId("5");
- driverContext.addOperatorContext(5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
-
- List<TSDataType> dataTypes = new ArrayList<>();
- dataTypes.add(TSDataType.TEXT);
- dataTypes.add(TSDataType.INT32);
- dataTypes.add(TSDataType.INT32);
- MeasurementPath measurementPath1 =
- new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-
- SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
- scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
- SeriesScanOperator seriesScanOperator1 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(0),
- planNodeId1,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator1
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- DeviceViewOperator deviceViewOperator1 =
- new DeviceViewOperator(
- driverContext.getOperatorContexts().get(2),
- Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
- Collections.singletonList(seriesScanOperator1),
- Collections.singletonList(Collections.singletonList(1)),
- dataTypes);
-
- MeasurementPath measurementPath2 =
- new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
- scanOptionsBuilder.withAllSensors(Collections.singleton("sensor1"));
- SeriesScanOperator seriesScanOperator2 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
- measurementPath2,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator2
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- DeviceViewOperator deviceViewOperator2 =
- new DeviceViewOperator(
- driverContext.getOperatorContexts().get(3),
- Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1"),
- Collections.singletonList(seriesScanOperator2),
- Collections.singletonList(Collections.singletonList(2)),
- dataTypes);
-
- List<String> devices = new ArrayList<>();
- devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
- devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
- List<Operator> deviceOperators = new ArrayList<>();
- deviceOperators.add(deviceViewOperator1);
- deviceOperators.add(deviceViewOperator2);
- DeviceMergeOperator deviceMergeOperator =
- new DeviceMergeOperator(
- driverContext.getOperatorContexts().get(4),
- devices,
- deviceOperators,
- dataTypes,
- new TimeSelector(500, true),
- new AscTimeComparator());
-
- int count = 0;
- while (deviceMergeOperator.hasNext()) {
- TsBlock tsBlock = deviceMergeOperator.next();
- if (tsBlock == null) {
- continue;
- }
- assertEquals(3, tsBlock.getValueColumnCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
- long expectedTime = count % 500;
- assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
- assertEquals(
- count < 500
- ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
- : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
- tsBlock.getColumn(0).getBinary(i).getStringValue());
- if (expectedTime < 200) {
- if (!tsBlock.getColumn(1).isNull(i)) {
- assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- } else {
- assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
- }
- } else if (expectedTime < 260
- || (expectedTime >= 300 && expectedTime < 380)
- || expectedTime >= 400) {
- if (!tsBlock.getColumn(1).isNull(i)) {
- assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- } else {
- assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
- }
- } else {
- if (!tsBlock.getColumn(1).isNull(i)) {
- assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- } else {
- assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
- }
- }
- }
- }
- assertEquals(1000, count);
- } catch (IllegalPathException e) {
- e.printStackTrace();
- fail();
- }
- }
-
- /**
- * Construct DeviceMergeOperator with the same device in two DeviceViewOperators.
- *
- * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
- *
- * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
- *
- * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
- * and the sensor0 column of device1 should be null.
- */
- @Test
- public void deviceMergeOperatorTest2() {
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
- try {
- // Construct operator tree
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
- PlanNodeId planNodeId1 = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId2 = new PlanNodeId("2");
- driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId3 = new PlanNodeId("3");
- driverContext.addOperatorContext(
- 3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
- PlanNodeId planNodeId4 = new PlanNodeId("4");
- driverContext.addOperatorContext(
- 4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
- PlanNodeId planNodeId5 = new PlanNodeId("5");
- driverContext.addOperatorContext(5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
-
- List<TSDataType> dataTypes = new ArrayList<>();
- dataTypes.add(TSDataType.TEXT);
- dataTypes.add(TSDataType.INT32);
- MeasurementPath measurementPath1 =
- new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
- SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
- scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
- SeriesScanOperator seriesScanOperator1 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(0),
- planNodeId1,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- List<TsFileResource> seqResources1 = new ArrayList<>();
- List<TsFileResource> unSeqResources1 = new ArrayList<>();
- seqResources1.add(seqResources.get(0));
- seqResources1.add(seqResources.get(1));
- seqResources1.add(seqResources.get(3));
- unSeqResources1.add(unSeqResources.get(0));
- unSeqResources1.add(unSeqResources.get(1));
- unSeqResources1.add(unSeqResources.get(3));
- unSeqResources1.add(unSeqResources.get(5));
- seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
- seriesScanOperator1
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- DeviceViewOperator deviceViewOperator1 =
- new DeviceViewOperator(
- driverContext.getOperatorContexts().get(2),
- Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
- Collections.singletonList(seriesScanOperator1),
- Collections.singletonList(Collections.singletonList(1)),
- dataTypes);
-
- SeriesScanOperator seriesScanOperator2 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator2
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- List<TsFileResource> seqResources2 = new ArrayList<>();
- List<TsFileResource> unSeqResources2 = new ArrayList<>();
- seqResources2.add(seqResources.get(2));
- seqResources2.add(seqResources.get(4));
- unSeqResources2.add(unSeqResources.get(2));
- unSeqResources2.add(unSeqResources.get(4));
- seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
- DeviceViewOperator deviceViewOperator2 =
- new DeviceViewOperator(
- driverContext.getOperatorContexts().get(3),
- Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
- Collections.singletonList(seriesScanOperator2),
- Collections.singletonList(Collections.singletonList(1)),
- dataTypes);
-
- List<String> devices = new ArrayList<>();
- devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
- List<Operator> deviceOperators = new ArrayList<>();
- deviceOperators.add(deviceViewOperator1);
- deviceOperators.add(deviceViewOperator2);
- DeviceMergeOperator deviceMergeOperator =
- new DeviceMergeOperator(
- driverContext.getOperatorContexts().get(4),
- devices,
- deviceOperators,
- dataTypes,
- new TimeSelector(500, true),
- new AscTimeComparator());
-
- int count = 0;
- while (deviceMergeOperator.hasNext()) {
- TsBlock tsBlock = deviceMergeOperator.next();
- if (tsBlock == null) {
- continue;
- }
- assertEquals(2, tsBlock.getValueColumnCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
- assertEquals(count, tsBlock.getTimeByIndex(i));
- assertEquals(
- DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
- tsBlock.getColumn(0).getBinary(i).getStringValue());
- if ((long) count < 200) {
- assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else if ((long) count < 260
- || ((long) count >= 300 && (long) count < 380)
- || (long) count >= 400) {
- assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else {
- assertEquals(count, tsBlock.getColumn(1).getInt(i));
- }
- }
- }
- assertEquals(500, count);
- } catch (IllegalPathException e) {
- e.printStackTrace();
- fail();
- }
- }
-
- /**
- * Construct DeviceMergeOperator with the same and different device at the same time in two
- * DeviceViewOperators.
- *
- * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0], [device1.sensor1]],
- *
- * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
- *
- * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
- * and the sensor0 column of device1 should be null.
- */
- @Test
- public void deviceMergeOperatorTest3() {
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
- try {
- // Construct operator tree
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
- PlanNodeId planNodeId1 = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId2 = new PlanNodeId("2");
- driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId3 = new PlanNodeId("3");
- driverContext.addOperatorContext(3, planNodeId3, SeriesScanOperator.class.getSimpleName());
- driverContext.addOperatorContext(
- 4, new PlanNodeId("4"), DeviceViewOperatorTest.class.getSimpleName());
- driverContext.addOperatorContext(
- 5, new PlanNodeId("5"), DeviceViewOperatorTest.class.getSimpleName());
- driverContext.addOperatorContext(
- 6, new PlanNodeId("6"), DeviceMergeOperator.class.getSimpleName());
-
- List<TSDataType> dataTypes = new ArrayList<>();
- dataTypes.add(TSDataType.TEXT);
- dataTypes.add(TSDataType.INT32);
- dataTypes.add(TSDataType.INT32);
- MeasurementPath measurementPath1 =
- new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
- SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
- scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
- SeriesScanOperator seriesScanOperator1 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(0),
- planNodeId1,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- List<TsFileResource> seqResources1 = new ArrayList<>();
- List<TsFileResource> unSeqResources1 = new ArrayList<>();
- seqResources1.add(seqResources.get(0));
- seqResources1.add(seqResources.get(1));
- seqResources1.add(seqResources.get(3));
- unSeqResources1.add(unSeqResources.get(0));
- unSeqResources1.add(unSeqResources.get(1));
- unSeqResources1.add(unSeqResources.get(3));
- unSeqResources1.add(unSeqResources.get(5));
- seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
- seriesScanOperator1
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- MeasurementPath measurementPath2 =
- new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
- scanOptionsBuilder.withAllSensors(Collections.singleton("sensor1"));
- SeriesScanOperator seriesScanOperator2 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
- measurementPath2,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator2
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- List<String> devices = new ArrayList<>();
- devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
- devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
- List<Operator> deviceOperators = new ArrayList<>();
- deviceOperators.add(seriesScanOperator1);
- deviceOperators.add(seriesScanOperator2);
- List<List<Integer>> deviceColumnIndex = new ArrayList<>();
- deviceColumnIndex.add(Collections.singletonList(1));
- deviceColumnIndex.add(Collections.singletonList(2));
- DeviceViewOperator deviceViewOperator1 =
- new DeviceViewOperator(
- driverContext.getOperatorContexts().get(3),
- devices,
- deviceOperators,
- deviceColumnIndex,
- dataTypes);
-
- scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
- SeriesScanOperator seriesScanOperator3 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(2),
- planNodeId3,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator3
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- List<TsFileResource> seqResources2 = new ArrayList<>();
- List<TsFileResource> unSeqResources2 = new ArrayList<>();
- seqResources2.add(seqResources.get(2));
- seqResources2.add(seqResources.get(4));
- unSeqResources2.add(unSeqResources.get(2));
- unSeqResources2.add(unSeqResources.get(4));
- seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
- DeviceViewOperator deviceViewOperator2 =
- new DeviceViewOperator(
- driverContext.getOperatorContexts().get(4),
- Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
- Collections.singletonList(seriesScanOperator3),
- Collections.singletonList(Collections.singletonList(1)),
- dataTypes);
-
- List<Operator> deviceViewOperators = new ArrayList<>();
- deviceViewOperators.add(deviceViewOperator1);
- deviceViewOperators.add(deviceViewOperator2);
- DeviceMergeOperator deviceMergeOperator =
- new DeviceMergeOperator(
- driverContext.getOperatorContexts().get(5),
- devices,
- deviceViewOperators,
- dataTypes,
- new TimeSelector(500, true),
- new AscTimeComparator());
-
- int count = 0;
- while (deviceMergeOperator.hasNext()) {
- TsBlock tsBlock = deviceMergeOperator.next();
- if (tsBlock == null) {
- continue;
- }
- assertEquals(3, tsBlock.getValueColumnCount());
- for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
- long expectedTime = count % 500;
- assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
- assertEquals(
- count < 500
- ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
- : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
- tsBlock.getColumn(0).getBinary(i).getStringValue());
- if (expectedTime < 200) {
- if (!tsBlock.getColumn(1).isNull(i)) {
- assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- } else {
- assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
- }
- } else if (expectedTime < 260
- || (expectedTime >= 300 && expectedTime < 380)
- || expectedTime >= 400) {
- if (!tsBlock.getColumn(1).isNull(i)) {
- assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- } else {
- assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
- }
- } else {
- if (!tsBlock.getColumn(1).isNull(i)) {
- assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- } else {
- assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
- }
- }
- }
- }
- assertEquals(1000, count);
- } catch (IllegalPathException e) {
- e.printStackTrace();
- fail();
- }
- }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
index 9e53ab2fe0..b524825fbb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -195,7 +194,7 @@ public class DeviceViewOperatorTest {
}
}
assertEquals(1000, count);
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
index af84ea03a8..75da328970 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
public class FillOperatorTest {
@Test
- public void batchConstantFillTest() {
+ public void batchConstantFillTest() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -80,7 +80,7 @@ public class FillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
int delta = index * 10000;
TsBlockBuilder builder =
new TsBlockBuilder(
@@ -122,12 +122,12 @@ public class FillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 3;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 3;
}
@@ -220,7 +220,7 @@ public class FillOperatorTest {
}
@Test
- public void batchPreviousFillTest() {
+ public void batchPreviousFillTest() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -250,7 +250,7 @@ public class FillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
int delta = index * 10000;
TsBlockBuilder builder =
new TsBlockBuilder(
@@ -291,12 +291,12 @@ public class FillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 3;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 3;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java
index 1430fc4a11..15e803e7f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java
@@ -92,7 +92,7 @@ public class HorizontallyConcatOperatorTest {
}
@Test
- public void batchTest1() {
+ public void batchTest1() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index c409373a4d..8a4862fa4a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -63,7 +63,7 @@ public class LastQueryMergeOperatorTest {
}
@Test
- public void testLastQueryMergeOperatorDesc() {
+ public void testLastQueryMergeOperatorDesc() throws Exception {
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
@@ -104,7 +104,7 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
for (int i = timeArray[index].length - 1; i >= 0; i--) {
LastQueryUtil.appendLastValue(
@@ -119,12 +119,12 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index >= 0;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNext();
}
@@ -168,7 +168,7 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
for (int i = timeArray[index].length - 1; i >= 0; i--) {
LastQueryUtil.appendLastValue(
@@ -183,12 +183,12 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index >= 0;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return !hasNext();
}
@@ -241,7 +241,9 @@ public class LastQueryMergeOperatorTest {
int count = timeArray.length - 1;
while (!lastQueryMergeOperator.isFinished()) {
assertTrue(lastQueryMergeOperator.isBlocked().isDone());
- TsBlock result = lastQueryMergeOperator.next();
+ TsBlock result = null;
+ result = lastQueryMergeOperator.next();
+
if (result == null) {
continue;
}
@@ -255,11 +257,12 @@ public class LastQueryMergeOperatorTest {
count--;
}
}
+
assertEquals(-1, count);
}
@Test
- public void testLastQueryMergeOperatorAsc() {
+ public void testLastQueryMergeOperatorAsc() throws Exception {
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
@@ -300,7 +303,7 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
for (int i = 0, size = timeArray[index].length; i < size; i++) {
LastQueryUtil.appendLastValue(
@@ -315,12 +318,13 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 2;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
+
return !hasNext();
}
@@ -364,7 +368,7 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
for (int i = 0, size = timeArray[index].length; i < size; i++) {
LastQueryUtil.appendLastValue(
@@ -379,12 +383,13 @@ public class LastQueryMergeOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 2;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
+
return !hasNext();
}
@@ -437,7 +442,8 @@ public class LastQueryMergeOperatorTest {
int count = 0;
while (!lastQueryMergeOperator.isFinished()) {
assertTrue(lastQueryMergeOperator.isBlocked().isDone());
- TsBlock result = lastQueryMergeOperator.next();
+ TsBlock result = null;
+ result = lastQueryMergeOperator.next();
if (result == null) {
continue;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index 5d387029ce..e3141272de 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -90,7 +90,7 @@ public class LastQueryOperatorTest {
}
@Test
- public void testLastQueryOperator1() {
+ public void testLastQueryOperator1() throws Exception {
try {
List<Aggregator> aggregators1 = LastQueryUtil.createAggregators(TSDataType.INT32);
MeasurementPath measurementPath1 =
@@ -341,7 +341,7 @@ public class LastQueryOperatorTest {
}
}
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index ea669cf1de..76b26258e6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -209,7 +208,7 @@ public class LastQuerySortOperatorTest {
}
}
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
@@ -343,7 +342,7 @@ public class LastQuerySortOperatorTest {
}
}
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
index 79a4d56445..a69d41765d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
@@ -86,7 +86,7 @@ public class LimitOperatorTest {
}
@Test
- public void batchTest() {
+ public void batchTest() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 6bac123f24..877510fe2e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertTrue;
public class LinearFillOperatorTest {
@Test
- public void batchLinearFillTest1() {
+ public void batchLinearFillTest1() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -131,7 +131,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder =
new TsBlockBuilder(
ImmutableList.of(
@@ -155,12 +155,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 3;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 3;
}
@@ -261,7 +261,7 @@ public class LinearFillOperatorTest {
}
@Test
- public void batchLinearFillTest1OrderByDesc() {
+ public void batchLinearFillTest1OrderByDesc() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -344,7 +344,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder =
new TsBlockBuilder(
ImmutableList.of(
@@ -368,12 +368,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 3;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 3;
}
@@ -474,7 +474,7 @@ public class LinearFillOperatorTest {
}
@Test
- public void batchLinearFillTest2() {
+ public void batchLinearFillTest2() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -557,7 +557,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder =
new TsBlockBuilder(
ImmutableList.of(
@@ -581,12 +581,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 3;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 3;
}
@@ -687,7 +687,7 @@ public class LinearFillOperatorTest {
}
@Test
- public void batchLinearFillTest2OrderByDesc() {
+ public void batchLinearFillTest2OrderByDesc() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -770,7 +770,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder =
new TsBlockBuilder(
ImmutableList.of(
@@ -794,12 +794,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 3;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 3;
}
@@ -900,7 +900,7 @@ public class LinearFillOperatorTest {
}
@Test
- public void batchLinearFillTest3() {
+ public void batchLinearFillTest3() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -937,7 +937,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT));
for (int i = 0; i < 1; i++) {
builder.getTimeColumnBuilder().writeLong(i + index);
@@ -955,12 +955,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 7;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 7;
}
@@ -1020,7 +1020,7 @@ public class LinearFillOperatorTest {
}
@Test
- public void batchLinearFillTest3OrderByDesc() {
+ public void batchLinearFillTest3OrderByDesc() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -1057,7 +1057,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT));
for (int i = 0; i < 1; i++) {
builder.getTimeColumnBuilder().writeLong(i + (6 - index));
@@ -1075,12 +1075,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 7;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 7;
}
@@ -1140,7 +1140,7 @@ public class LinearFillOperatorTest {
}
@Test
- public void batchLinearFillBooleanTest() {
+ public void batchLinearFillBooleanTest() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -1177,7 +1177,7 @@ public class LinearFillOperatorTest {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws Exception {
TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.BOOLEAN));
for (int i = 0; i < 1; i++) {
builder.getTimeColumnBuilder().writeLong(i + index);
@@ -1195,12 +1195,12 @@ public class LinearFillOperatorTest {
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() throws Exception {
return index < 7;
}
@Override
- public boolean isFinished() {
+ public boolean isFinished() throws Exception {
return index >= 7;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 73e93e6f67..f91bc21956 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -342,7 +342,7 @@ public class MergeSortOperatorTest {
}
@Test
- public void testOrderByTime1() {
+ public void testOrderByTime1() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.ASC);
long lastTime = -1;
int checkDevice = 0;
@@ -385,11 +385,12 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 1500);
}
@Test
- public void testOrderByTime2() {
+ public void testOrderByTime2() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.DESC);
long lastTime = -1;
int checkDevice = 0;
@@ -432,11 +433,12 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 1500);
}
@Test
- public void testOrderByTime3() {
+ public void testOrderByTime3() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.DESC);
long lastTime = Long.MAX_VALUE;
int checkDevice = 0;
@@ -479,11 +481,12 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 1500);
}
@Test
- public void testOrderByTime4() {
+ public void testOrderByTime4() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.ASC);
long lastTime = Long.MAX_VALUE;
int checkDevice = 0;
@@ -526,6 +529,7 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 1500);
}
@@ -835,7 +839,7 @@ public class MergeSortOperatorTest {
}
@Test
- public void testOrderByTime1_2() {
+ public void testOrderByTime1_2() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.ASC);
long lastTime = -1;
int checkDevice = 0;
@@ -874,15 +878,17 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 2000);
}
@Test
- public void testOrderByTime2_2() {
+ public void testOrderByTime2_2() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.DESC);
long lastTime = -1;
int checkDevice = 0;
int count = 0;
+
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
if (tsBlock == null) continue;
@@ -917,11 +923,12 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 2000);
}
@Test
- public void testOrderByTime3_2() {
+ public void testOrderByTime3_2() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.DESC);
long lastTime = Long.MAX_VALUE;
int checkDevice = 0;
@@ -960,11 +967,12 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 2000);
}
@Test
- public void testOrderByTime4_2() {
+ public void testOrderByTime4_2() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.ASC);
long lastTime = Long.MAX_VALUE;
int checkDevice = 0;
@@ -1003,6 +1011,7 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 2000);
}
// ------------------------------------------------------------------------------------------------
@@ -1274,7 +1283,7 @@ public class MergeSortOperatorTest {
}
@Test
- public void testOrderByDevice1() {
+ public void testOrderByDevice1() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.ASC);
long lastTime = -1;
int checkDevice = 0;
@@ -1322,11 +1331,12 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 2000);
}
@Test
- public void testOrderByDevice2() {
+ public void testOrderByDevice2() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.DESC);
long lastTime = -1;
int checkDevice = 0;
@@ -1374,15 +1384,17 @@ public class MergeSortOperatorTest {
}
}
}
+
assertEquals(count, 2000);
}
@Test
- public void testOrderByDevice3() {
+ public void testOrderByDevice3() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.ASC);
long lastTime = Long.MAX_VALUE;
int checkDevice = 0;
int count = 0;
+
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
if (tsBlock == null) continue;
@@ -1430,7 +1442,7 @@ public class MergeSortOperatorTest {
}
@Test
- public void testOrderByDevice4() {
+ public void testOrderByDevice4() throws Exception {
MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.DESC);
long lastTime = Long.MAX_VALUE;
int checkDevice = 0;
@@ -1493,7 +1505,7 @@ public class MergeSortOperatorTest {
// ShowQueriesOperator ShowQueriesOperator
// ------------------------------------------------------------------------------------------------
@Test
- public void mergeSortWithSortOperatorTest() {
+ public void mergeSortWithSortOperatorTest() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index 16747a70ea..b4a098282e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -87,7 +87,7 @@ public class OffsetOperatorTest {
}
@Test
- public void batchTest1() {
+ public void batchTest1() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -192,7 +192,7 @@ public class OffsetOperatorTest {
/** offset is 0 in which case we will get all data */
@Test
- public void batchTest2() {
+ public void batchTest2() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -294,7 +294,7 @@ public class OffsetOperatorTest {
/** offset is larger than max row number in which case we will get no data */
@Test
- public void batchTest3() {
+ public void batchTest3() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index aa333914a9..3f3fc62d27 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -114,7 +114,7 @@ public class RawDataAggregationOperatorTest {
* always used with value filter.
*/
@Test
- public void aggregateRawDataTest1() throws IllegalPathException {
+ public void aggregateRawDataTest1() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
List<List<InputLocation[]>> inputLocations = new ArrayList<>();
for (int i = 0; i < 2; i++) {
@@ -165,7 +165,7 @@ public class RawDataAggregationOperatorTest {
* always used with value filter.
*/
@Test
- public void aggregateRawDataTest2() throws IllegalPathException {
+ public void aggregateRawDataTest2() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
List<List<InputLocation[]>> inputLocations = new ArrayList<>();
for (int i = 0; i < 2; i++) {
@@ -209,12 +209,13 @@ public class RawDataAggregationOperatorTest {
}
count++;
}
+
assertEquals(1, count);
}
/** Test aggregating raw data by time interval. */
@Test
- public void groupByTimeRawDataTest1() throws IllegalPathException {
+ public void groupByTimeRawDataTest1() throws Exception {
int[][] result =
new int[][] {
{100, 100, 100, 99},
@@ -247,6 +248,7 @@ public class RawDataAggregationOperatorTest {
initRawDataAggregationOperator(
aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -265,12 +267,13 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(4, count);
}
/** Test aggregating raw data by time interval. */
@Test
- public void groupByTimeRawDataTest2() throws IllegalPathException {
+ public void groupByTimeRawDataTest2() throws Exception {
double[][] result =
new double[][] {
{20049.5, 20149.5, 6249.5, 8429.808},
@@ -305,6 +308,7 @@ public class RawDataAggregationOperatorTest {
initRawDataAggregationOperator(
aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -325,12 +329,13 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(4, count);
}
/** Test by time interval with EndTime */
@Test
- public void groupByTimeRawDataTest3() throws IllegalPathException {
+ public void groupByTimeRawDataTest3() throws Exception {
int[][] result =
new int[][] {
{100, 100, 100, 99},
@@ -367,6 +372,7 @@ public class RawDataAggregationOperatorTest {
initRawDataAggregationOperator(
aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -394,12 +400,13 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(4, count);
}
/** 0 - 99 100 - 199 200 - 299 300 - 399 400 - 499 500 - 599 */
@Test
- public void groupByTimeRawDataTest4() throws IllegalPathException {
+ public void groupByTimeRawDataTest4() throws Exception {
int[][] result =
new int[][] {
{100, 100, 100, 100, 100, 0},
@@ -430,6 +437,7 @@ public class RawDataAggregationOperatorTest {
initRawDataAggregationOperator(
aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -457,6 +465,7 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(6, count);
}
@@ -465,7 +474,7 @@ public class RawDataAggregationOperatorTest {
* 501 - 600
*/
@Test
- public void groupByTimeRawDataTest5() throws IllegalPathException {
+ public void groupByTimeRawDataTest5() throws Exception {
int[][] result =
new int[][] {
{100, 100, 100, 100, 99, 0},
@@ -496,6 +505,7 @@ public class RawDataAggregationOperatorTest {
initRawDataAggregationOperator(
aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -521,12 +531,13 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(6, count);
}
/** 0 - 259 260 - 299 `300 - 499 */
@Test
- public void groupByEventRawDataTest1() throws IllegalPathException {
+ public void groupByEventRawDataTest1() throws Exception {
int[][] result =
new int[][] {
{0, 260, 300},
@@ -567,6 +578,7 @@ public class RawDataAggregationOperatorTest {
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -588,11 +600,12 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(3, count);
}
@Test
- public void groupByEventRawDataTest2() throws IllegalPathException {
+ public void groupByEventRawDataTest2() throws Exception {
int[][] result =
new int[][] {
{4019900, 613770, 11180, 827160, 7790, 1044950},
@@ -631,6 +644,7 @@ public class RawDataAggregationOperatorTest {
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -651,11 +665,12 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(6, count);
}
@Test
- public void groupByEventRawDataTest3() throws IllegalPathException {
+ public void groupByEventRawDataTest3() throws Exception {
int[][] result =
new int[][] {
{4019900, 613770, 11180, 827160, 7790, 1044950},
@@ -690,6 +705,7 @@ public class RawDataAggregationOperatorTest {
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -709,11 +725,12 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(6, count);
}
/** 0 - 199 200 - 259 260 - 299 300 - 379 380 - 399 400 - 499 */
@Test
- public void groupByEventRawDataTest4() throws IllegalPathException {
+ public void groupByEventRawDataTest4() throws Exception {
int[] result =
new int[] {
20000, 10200, 260, 10300, 380, 10400,
@@ -739,6 +756,7 @@ public class RawDataAggregationOperatorTest {
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -753,25 +771,25 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(6, count);
}
@Test
- public void onePointInOneEqualEventWindowTest() throws IllegalPathException {
+ public void onePointInOneEqualEventWindowTest() throws Exception {
WindowParameter windowParameter =
new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0);
onePointInOneWindowTest(windowParameter);
}
@Test
- public void onePointInOneVariationEventWindowTest() throws IllegalPathException {
+ public void onePointInOneVariationEventWindowTest() throws Exception {
WindowParameter windowParameter =
new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0.5);
onePointInOneWindowTest(windowParameter);
}
- private void onePointInOneWindowTest(WindowParameter windowParameter)
- throws IllegalPathException {
+ private void onePointInOneWindowTest(WindowParameter windowParameter) throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
List<List<InputLocation[]>> inputLocations = new ArrayList<>();
for (int i = 0; i < 2; i++) {
@@ -791,6 +809,7 @@ public class RawDataAggregationOperatorTest {
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int resultMinTime1 = -1, resultMinTime2 = -1;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -815,12 +834,13 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(resultMinTime1, 499);
assertEquals(resultMinTime2, 499);
}
@Test
- public void groupBySessionRawDataTest1() throws IllegalPathException {
+ public void groupBySessionRawDataTest1() throws Exception {
int[][] result = new int[][] {{0}, {499}, {20000}, {10499}};
List<TAggregationType> aggregationTypes = new ArrayList<>();
List<List<InputLocation[]>> inputLocations = new ArrayList<>();
@@ -854,6 +874,7 @@ public class RawDataAggregationOperatorTest {
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
+
while (rawDataAggregationOperator.isBlocked().isDone()
&& rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -875,6 +896,7 @@ public class RawDataAggregationOperatorTest {
}
}
}
+
assertEquals(1, count);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 36c815927a..06ea323617 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -91,7 +91,7 @@ public class SeriesAggregationScanOperatorTest {
}
@Test
- public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+ public void testAggregationWithoutTimeFilter() throws Exception {
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
List<Aggregator> aggregators = new ArrayList<>();
AccumulatorFactory.createAccumulators(
@@ -104,16 +104,18 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
+ public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception {
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
List<Aggregator> aggregators = new ArrayList<>();
AccumulatorFactory.createAccumulators(
@@ -126,16 +128,18 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, false, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
+ public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.COUNT);
aggregationTypes.add(TAggregationType.SUM);
@@ -150,17 +154,19 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
+ public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.FIRST_VALUE);
aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -179,6 +185,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
@@ -189,12 +196,12 @@ public class SeriesAggregationScanOperatorTest {
assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
- throws IllegalPathException {
+ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.FIRST_VALUE);
aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -213,6 +220,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, false, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
@@ -223,11 +231,12 @@ public class SeriesAggregationScanOperatorTest {
assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testAggregationWithTimeFilter1() throws IllegalPathException {
+ public void testAggregationWithTimeFilter1() throws Exception {
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
List<Aggregator> aggregators = new ArrayList<>();
AccumulatorFactory.createAccumulators(
@@ -241,16 +250,18 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testAggregationWithTimeFilter2() throws IllegalPathException {
+ public void testAggregationWithTimeFilter2() throws Exception {
Filter timeFilter = TimeFilter.ltEq(379);
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
List<Aggregator> aggregators = new ArrayList<>();
@@ -264,16 +275,18 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testAggregationWithTimeFilter3() throws IllegalPathException {
+ public void testAggregationWithTimeFilter3() throws Exception {
Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
List<Aggregator> aggregators = new ArrayList<>();
@@ -287,16 +300,18 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
+ public void testMultiAggregationWithTimeFilter() throws Exception {
List<TAggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(TAggregationType.FIRST_VALUE);
aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -316,6 +331,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
@@ -326,11 +342,12 @@ public class SeriesAggregationScanOperatorTest {
assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
count++;
}
+
assertEquals(1, count);
}
@Test
- public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
+ public void testGroupByWithoutGlobalTimeFilter() throws Exception {
int[] result = new int[] {100, 100, 100, 99};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
@@ -345,6 +362,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -354,11 +372,12 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(4, count);
}
@Test
- public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
+ public void testGroupByWithGlobalTimeFilter() throws Exception {
int[] result = new int[] {0, 80, 100, 80};
Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
@@ -374,6 +393,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, timeFilter, true, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -383,11 +403,12 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(4, count);
}
@Test
- public void testGroupByWithMultiFunction() throws IllegalPathException {
+ public void testGroupByWithMultiFunction() throws Exception {
int[][] result =
new int[][] {
{20000, 20100, 10200, 10300},
@@ -412,6 +433,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -424,11 +446,12 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(4, count);
}
@Test
- public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception {
int[][] result =
new int[][] {
{20000, 20100, 10200, 10300},
@@ -453,6 +476,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, false, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -465,11 +489,12 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(4, count);
}
@Test
- public void testGroupBySlidingTimeWindow() throws IllegalPathException {
+ public void testGroupBySlidingTimeWindow() throws Exception {
int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 49};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
@@ -484,6 +509,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -493,11 +519,12 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(result.length, count);
}
@Test
- public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
+ public void testGroupBySlidingTimeWindow2() throws Exception {
int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
@@ -513,6 +540,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -522,11 +550,12 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(timeColumn.length, count);
}
@Test
- public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
+ public void testGroupBySlidingWindowWithMultiFunction() throws Exception {
int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
int[][] result =
new int[][] {
@@ -552,6 +581,7 @@ public class SeriesAggregationScanOperatorTest {
SeriesAggregationScanOperator seriesAggregationScanOperator =
initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
+
while (seriesAggregationScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregationScanOperator.next();
int positionCount = resultTsBlock.getPositionCount();
@@ -564,6 +594,7 @@ public class SeriesAggregationScanOperatorTest {
count++;
}
}
+
assertEquals(timeColumn.length, count);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java
index d79cb7fad9..2c0721684f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java
@@ -80,7 +80,7 @@ public class SeriesScanOperatorTest {
}
@Test
- public void batchTest() {
+ public void batchTest() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
index b935d1d665..cd1a9a48a7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -194,7 +193,7 @@ public class SingleDeviceViewOperatorTest {
count++;
}
assertEquals(500, total);
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 50d2ab026f..105f468a59 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -133,7 +133,7 @@ public class SlidingWindowAggregationOperatorTest {
}
@Test
- public void slidingWindowAggregationTest() throws IllegalPathException {
+ public void slidingWindowAggregationTest() throws Exception {
String[] retArray =
new String[] {
"0,100,20049.5,2004950.0,20099,0,99,20000,20099,20000",
@@ -175,6 +175,7 @@ public class SlidingWindowAggregationOperatorTest {
count--;
}
}
+
Assert.assertEquals(0, count);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 70c768c1cf..ef122f78b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -87,7 +87,7 @@ public class TimeJoinOperatorTest {
}
@Test
- public void batchTest1() {
+ public void batchTest1() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -182,7 +182,7 @@ public class TimeJoinOperatorTest {
/** test time join with non-exist sensor */
@Test
- public void batchTest2() {
+ public void batchTest2() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -298,7 +298,7 @@ public class TimeJoinOperatorTest {
/** test time join with non-exist sensor and order by time desc */
@Test
- public void batchTest3() {
+ public void batchTest3() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 953da1b480..a9fc842802 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -113,7 +113,7 @@ public class UpdateLastCacheOperatorTest {
assertFalse(updateLastCacheOperator.hasNext());
assertTrue(updateLastCacheOperator.isFinished());
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
@@ -143,7 +143,7 @@ public class UpdateLastCacheOperatorTest {
assertFalse(updateLastCacheOperator.hasNext());
assertTrue(updateLastCacheOperator.isFinished());
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
@@ -173,7 +173,7 @@ public class UpdateLastCacheOperatorTest {
assertFalse(updateLastCacheOperator.hasNext());
assertTrue(updateLastCacheOperator.isFinished());
- } catch (IllegalPathException e) {
+ } catch (Exception e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
index ec10473b9b..5bf7e1a656 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
@@ -55,7 +55,7 @@ public class SchemaCountOperatorTest {
private static final String SCHEMA_COUNT_OPERATOR_TEST_SG = "root.SchemaCountOperatorTest";
@Test
- public void testSchemaCountOperator() {
+ public void testSchemaCountOperator() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -184,7 +184,7 @@ public class SchemaCountOperatorTest {
}
}
- private List<TsBlock> collectResult(CountGroupByLevelScanOperator<?> operator) {
+ private List<TsBlock> collectResult(CountGroupByLevelScanOperator<?> operator) throws Exception {
List<TsBlock> tsBlocks = new ArrayList<>();
while (operator.hasNext()) {
TsBlock tsBlock = operator.next();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
index 434d509c75..7adb9b07bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
@@ -64,7 +64,7 @@ public class SchemaQueryScanOperatorTest {
private static final String META_SCAN_OPERATOR_TEST_SG = "root.MetaScanOperatorTest";
@Test
- public void testDeviceSchemaScan() {
+ public void testDeviceSchemaScan() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
@@ -156,7 +156,7 @@ public class SchemaQueryScanOperatorTest {
}
@Test
- public void testTimeSeriesSchemaScan() {
+ public void testTimeSeriesSchemaScan() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index bb1e0bfb4b..671a24a39a 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -63,7 +63,7 @@ struct TGetDataBlockRequest {
1: required TFragmentInstanceId sourceFragmentInstanceId
2: required i32 startSequenceId
3: required i32 endSequenceId
- // index of upstream SinkHandle
+ // index of upstream SinkChannel
4: required i32 index
}
@@ -75,10 +75,16 @@ struct TAcknowledgeDataBlockEvent {
1: required TFragmentInstanceId sourceFragmentInstanceId
2: required i32 startSequenceId
3: required i32 endSequenceId
- // index of upstream SinkHandle
+ // index of upstream SinkChannel
4: required i32 index
}
+struct TCloseSinkChannelEvent {
+ 1: required TFragmentInstanceId sourceFragmentInstanceId
+ // index of upstream SinkChannel
+ 2: required i32 index
+}
+
struct TNewDataBlockEvent {
1: required TFragmentInstanceId targetFragmentInstanceId
2: required string targetPlanNodeId
@@ -766,6 +772,8 @@ service MPPDataExchangeService {
void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e);
+ void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
+
void onNewDataBlockEvent(TNewDataBlockEvent e);
void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);