You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/25 09:37:33 UTC
[iotdb] branch master updated: Optimize source handle error handling (#5659)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 138bd00ff9 Optimize source handle error handling (#5659)
138bd00ff9 is described below
commit 138bd00ff998efd2671263adac4157d0490deabd
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Mon Apr 25 17:37:27 2022 +0800
Optimize source handle error handling (#5659)
---
.../iotdb/db/mpp/buffer/DataBlockManager.java | 58 +++++++++++++------
.../iotdb/db/mpp/buffer/IDataBlockManager.java | 5 +-
.../db/mpp/buffer/IDataBlockManagerCallback.java | 24 ++++++++
.../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 10 ++--
.../apache/iotdb/db/mpp/buffer/ISourceHandle.java | 10 +++-
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 21 +++----
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 34 ++++-------
.../iotdb/db/mpp/execution/QueryExecution.java | 7 +--
.../db/mpp/operator/source/ExchangeOperator.java | 9 +--
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 3 +-
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 8 +--
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 67 ++++++----------------
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 10 ++--
13 files changed, 132 insertions(+), 134 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 7a38ef6d09..d55de6f780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -50,19 +50,21 @@ public class DataBlockManager implements IDataBlockManager {
private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
public interface SourceHandleListener {
- void onFinished(SourceHandle sourceHandle);
+ void onFinished(ISourceHandle sourceHandle);
- void onClosed(SourceHandle sourceHandle);
+ void onClosed(ISourceHandle sourceHandle);
+
+ void onFailure(ISourceHandle sourceHandle, Throwable t);
}
public interface SinkHandleListener {
- void onFinish(SinkHandle sinkHandle);
+ void onFinish(ISinkHandle sinkHandle);
- void onClosed(SinkHandle sinkHandle);
+ void onClosed(ISinkHandle sinkHandle);
- void onAborted(SinkHandle sinkHandle);
+ void onAborted(ISinkHandle sinkHandle);
- void onFailure(Throwable t);
+ void onFailure(ISinkHandle sinkHandle, Throwable t);
}
/** Handle thrift communications. */
@@ -169,8 +171,14 @@ public class DataBlockManager implements IDataBlockManager {
/** Listen to the state changes of a source handle. */
class SourceHandleListenerImpl implements SourceHandleListener {
+ private final IDataBlockManagerCallback<Throwable> onFailureCallback;
+
+ public SourceHandleListenerImpl(IDataBlockManagerCallback<Throwable> onFailureCallback) {
+ this.onFailureCallback = onFailureCallback;
+ }
+
@Override
- public void onFinished(SourceHandle sourceHandle) {
+ public void onFinished(ISourceHandle sourceHandle) {
logger.info("Release resources of finished source handle {}", sourceHandle);
if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
|| !sourceHandles
@@ -190,22 +198,33 @@ public class DataBlockManager implements IDataBlockManager {
}
@Override
- public void onClosed(SourceHandle sourceHandle) {
+ public void onClosed(ISourceHandle sourceHandle) {
onFinished(sourceHandle);
}
+
+ @Override
+ public void onFailure(ISourceHandle sourceHandle, Throwable t) {
+ logger.error("Source handle {} failed due to {}", sourceHandle, t);
+ if (onFailureCallback != null) {
+ onFailureCallback.call(t);
+ }
+ }
}
/** Listen to the state changes of a sink handle. */
class SinkHandleListenerImpl implements SinkHandleListener {
private final FragmentInstanceContext context;
+ private final IDataBlockManagerCallback<Throwable> onFailureCallback;
- public SinkHandleListenerImpl(FragmentInstanceContext context) {
+ public SinkHandleListenerImpl(
+ FragmentInstanceContext context, IDataBlockManagerCallback<Throwable> onFailureCallback) {
this.context = context;
+ this.onFailureCallback = onFailureCallback;
}
@Override
- public void onFinish(SinkHandle sinkHandle) {
+ public void onFinish(ISinkHandle sinkHandle) {
logger.info("Release resources of finished sink handle {}", sourceHandles);
if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
@@ -215,12 +234,12 @@ public class DataBlockManager implements IDataBlockManager {
}
@Override
- public void onClosed(SinkHandle sinkHandle) {
+ public void onClosed(ISinkHandle sinkHandle) {
context.transitionToFlushing();
}
@Override
- public void onAborted(SinkHandle sinkHandle) {
+ public void onAborted(ISinkHandle sinkHandle) {
logger.info("Release resources of aborted sink handle {}", sourceHandles);
if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
logger.info("Resources of aborted sink handle {} has already been released", sinkHandle);
@@ -229,8 +248,11 @@ public class DataBlockManager implements IDataBlockManager {
}
@Override
- public void onFailure(Throwable t) {
- context.failed(t);
+ public void onFailure(ISinkHandle sinkHandle, Throwable t) {
+ logger.error("Sink handle {} failed due to {}", sinkHandle, t);
+ if (onFailureCallback != null) {
+ onFailureCallback.call(t);
+ }
}
}
@@ -269,6 +291,7 @@ public class DataBlockManager implements IDataBlockManager {
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
+ // TODO: replace with callbacks to decouple DataBlockManager from FragmentInstanceContext
FragmentInstanceContext instanceContext) {
if (sinkHandles.containsKey(localFragmentInstanceId)) {
throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
@@ -290,7 +313,7 @@ public class DataBlockManager implements IDataBlockManager {
executorService,
clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
- new SinkHandleListenerImpl(instanceContext));
+ new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
sinkHandles.put(localFragmentInstanceId, sinkHandle);
return sinkHandle;
}
@@ -300,7 +323,8 @@ public class DataBlockManager implements IDataBlockManager {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
TEndPoint remoteEndpoint,
- TFragmentInstanceId remoteFragmentInstanceId) {
+ TFragmentInstanceId remoteFragmentInstanceId,
+ IDataBlockManagerCallback<Throwable> onFailureCallback) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
throw new IllegalStateException(
@@ -327,7 +351,7 @@ public class DataBlockManager implements IDataBlockManager {
executorService,
clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
- new SourceHandleListenerImpl());
+ new SourceHandleListenerImpl(onFailureCallback));
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
.put(localPlanNodeId, sourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index 0d60d346db..049f8b023d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -33,6 +33,7 @@ public interface IDataBlockManager {
* @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
* be sent to.
* @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
+ * @param instanceContext The context of local fragment instance.
*/
ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
@@ -51,12 +52,14 @@ public interface IDataBlockManager {
* @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
* be received from.
* @param remoteFragmentInstanceId ID of the remote fragment instance.
+ * @param onFailureCallback The callback on failure.
*/
ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
TEndPoint endpoint,
- TFragmentInstanceId remoteFragmentInstanceId);
+ TFragmentInstanceId remoteFragmentInstanceId,
+ IDataBlockManagerCallback<Throwable> onFailureCallback);
/**
* Release all the related resources of a fragment instance, including data blocks that are not
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManagerCallback.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManagerCallback.java
new file mode 100644
index 0000000000..ecafdd4a28
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManagerCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+public interface IDataBlockManagerCallback<T> {
+ public void call(T argument);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 99f92e3f45..b28b812175 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -18,21 +18,21 @@
*/
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
import java.util.List;
public interface ISinkHandle {
+ /** Get the local fragment instance ID that this sink handle belongs to. */
+ TFragmentInstanceId getLocalFragmentInstanceId();
+
/** Get the total amount of memory used by buffered tsblocks. */
long getBufferRetainedSizeInBytes();
- /** Get the number of buffered tsblocks. */
- int getNumOfBufferedTsBlocks();
-
/** Get a future that will be completed when the output buffer is not full. */
ListenableFuture<Void> isFull();
@@ -48,7 +48,7 @@ public interface ISinkHandle {
* tsblock call is ignored. This can happen with limit queries. A {@link RuntimeException} will be
* thrown if any exception happened * during the data transmission.
*/
- void send(int partition, List<TsBlock> tsBlocks) throws IOException;
+ void send(int partition, List<TsBlock> tsBlocks);
/**
* Notify the handle that no more tsblocks will be sent. Any future calls to send a tsblock should
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index 9e31918627..e9be12d838 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
-import java.io.IOException;
public interface ISourceHandle extends Closeable {
+ /** Get the local fragment instance ID that this source handle belongs to. */
+ TFragmentInstanceId getLocalFragmentInstanceId();
+
+ /** Get the local plan node ID that this source handle belongs to. */
+ String getLocalPlanNodeId();
+
/** Get the total amount of memory used by buffered tsblocks. */
long getBufferRetainedSizeInBytes();
@@ -34,7 +40,7 @@ public interface ISourceHandle extends Closeable {
* Get a {@link TsBlock}. If the source handle is blocked, a null will be returned. A {@link
* RuntimeException} will be thrown if any error happened.
*/
- TsBlock receive() throws IOException;
+ TsBlock receive();
/** If there are more tsblocks. */
boolean isFinished();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index c540103126..5d9cfc8263 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -71,9 +71,9 @@ public class SinkHandle implements ISinkHandle {
private volatile ListenableFuture<Void> blocked = immediateFuture(null);
private int nextSequenceId = 0;
- private long bufferRetainedSizeInBytes;
- private boolean closed;
- private boolean noMoreTsBlocks;
+ private long bufferRetainedSizeInBytes = 0;
+ private boolean closed = false;
+ private boolean noMoreTsBlocks = false;
public SinkHandle(
TEndPoint remoteEndpoint,
@@ -105,10 +105,6 @@ public class SinkHandle implements ISinkHandle {
}
private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
- // TODO: (xingtanzjr)
- // We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
- // called) is sent before NewDataBlockEvent arrived
- // new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
}
@@ -248,7 +244,6 @@ public class SinkHandle implements ISinkHandle {
return bufferRetainedSizeInBytes;
}
- @Override
public int getNumOfBufferedTsBlocks() {
return sequenceIdToTsBlock.size();
}
@@ -289,19 +284,19 @@ public class SinkHandle implements ISinkHandle {
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), freedBytes);
}
- TEndPoint getRemoteEndpoint() {
+ public TEndPoint getRemoteEndpoint() {
return remoteEndpoint;
}
- TFragmentInstanceId getRemoteFragmentInstanceId() {
+ public TFragmentInstanceId getRemoteFragmentInstanceId() {
return remoteFragmentInstanceId;
}
- String getRemotePlanNodeId() {
+ public String getRemotePlanNodeId() {
return remotePlanNodeId;
}
- TFragmentInstanceId getLocalFragmentInstanceId() {
+ public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
@@ -364,7 +359,7 @@ public class SinkHandle implements ISinkHandle {
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
- sinkHandleListener.onFailure(e);
+ sinkHandleListener.onFailure(SinkHandle.this, e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b249cb78d7..cbe5e9d540 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -37,7 +37,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -68,12 +67,11 @@ public class SourceHandle implements ISourceHandle {
private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
private volatile SettableFuture<Void> blocked = SettableFuture.create();
- private long bufferRetainedSizeInBytes;
+ private long bufferRetainedSizeInBytes = 0L;
private int currSequenceId = 0;
private int nextSequenceId = 0;
private int lastSequenceId = Integer.MAX_VALUE;
- private boolean closed;
- private Throwable throwable;
+ private boolean closed = false;
public SourceHandle(
TEndPoint remoteEndpoint,
@@ -98,10 +96,7 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public TsBlock receive() throws IOException {
- if (throwable != null) {
- throw new IOException(throwable);
- }
+ public TsBlock receive() {
if (closed) {
throw new IllegalStateException("Source handle is closed.");
}
@@ -196,9 +191,6 @@ public class SourceHandle implements ISourceHandle {
@Override
public ListenableFuture<Void> isBlocked() {
- if (throwable != null) {
- throw new RuntimeException(throwable);
- }
if (closed) {
throw new IllegalStateException("Source handle is closed.");
}
@@ -240,7 +232,7 @@ public class SourceHandle implements ISourceHandle {
@Override
public boolean isFinished() {
- return throwable == null && remoteTsBlockedConsumedUp();
+ return remoteTsBlockedConsumedUp();
}
// Return true indicates two points:
@@ -250,19 +242,19 @@ public class SourceHandle implements ISourceHandle {
return currSequenceId - 1 == lastSequenceId;
}
- TEndPoint getRemoteEndpoint() {
+ public TEndPoint getRemoteEndpoint() {
return remoteEndpoint;
}
- TFragmentInstanceId getRemoteFragmentInstanceId() {
+ public TFragmentInstanceId getRemoteFragmentInstanceId() {
return remoteFragmentInstanceId.deepCopy();
}
- TFragmentInstanceId getLocalFragmentInstanceId() {
+ public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
- String getLocalPlanNodeId() {
+ public String getLocalPlanNodeId() {
return localPlanNodeId;
}
@@ -352,17 +344,11 @@ public class SourceHandle implements ISourceHandle {
attempt);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (SourceHandle.this) {
- throwable = e;
bufferRetainedSizeInBytes -= reservedBytes;
localMemoryManager
.getQueryPool()
.free(localFragmentInstanceId.getQueryId(), reservedBytes);
- // That GetDataBlocksTask throws exception means some TsBlock cannot be
- // fetched permanently. Someone maybe waiting the SourceHandle to be
- // unblocked after fetching data. So the blocked should be released.
- if (blocked != null && !blocked.isDone()) {
- blocked.cancel(true);
- }
+ sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
}
@@ -406,7 +392,7 @@ public class SourceHandle implements ISourceHandle {
attempt);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (this) {
- throwable = e;
+ sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index ad9a1c170a..0df9e2de4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -46,7 +46,6 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@@ -210,8 +209,7 @@ public class QueryExecution implements IQueryExecution {
return null;
}
return resultHandle.receive();
-
- } catch (ExecutionException | IOException | CancellationException e) {
+ } catch (ExecutionException | CancellationException e) {
stateMachine.transitionToFailed(e);
throwIfUnchecked(e.getCause());
throw new RuntimeException(e.getCause());
@@ -284,7 +282,8 @@ public class QueryExecution implements IQueryExecution {
new TEndPoint(
context.getResultNodeContext().getUpStreamEndpoint().getIp(),
IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
- context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
+ context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+ stateMachine::transitionToFailed);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 397b7dc461..26d4250acc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
-
public class ExchangeOperator implements SourceOperator {
private final OperatorContext operatorContext;
@@ -51,12 +49,7 @@ public class ExchangeOperator implements SourceOperator {
@Override
public TsBlock next() {
- try {
- return sourceHandle.receive();
- } catch (IOException e) {
- throw new RuntimeException(
- "Error happened while reading from source handle " + sourceHandle, e);
- }
+ return sourceHandle.receive();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 230ffac797..f191e47c79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -356,7 +356,8 @@ public class LocalExecutionPlanner {
new TEndPoint(
source.getIp(),
IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
- remoteInstanceId.toThrift());
+ remoteInstanceId.toThrift(),
+ context.instanceContext::failed);
return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 126faaeef5..d45eae5db9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -355,12 +355,12 @@ public class SinkHandleTest {
List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
// Construct a mock client.
Client mockClient = Mockito.mock(Client.class);
- TException exception = new TException("Mock exception");
+ TException mockException = new TException("Mock exception");
try {
- Mockito.doThrow(exception)
+ Mockito.doThrow(mockException)
.when(mockClient)
.onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
- Mockito.doThrow(exception)
+ Mockito.doThrow(mockException)
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
} catch (TException e) {
@@ -413,7 +413,7 @@ public class SinkHandleTest {
Assert.fail();
}
- Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(exception);
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(sinkHandle, mockException);
// Close the SinkHandle.
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index dc84cff501..7f53a2f4ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -36,7 +36,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -137,12 +136,9 @@ public class SourceHandleTest {
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
- try {
- sourceHandle.receive();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+
+ sourceHandle.receive();
+
if (i < numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
@@ -260,12 +256,7 @@ public class SourceHandleTest {
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
Mockito.verify(spyMemoryPool, Mockito.times(i)).free(queryId, mockTsBlockSize);
- try {
- sourceHandle.receive();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sourceHandle.receive();
try {
Thread.sleep(100L);
if (i < 5) {
@@ -425,12 +416,7 @@ public class SourceHandleTest {
// The local fragment instance consumes the data blocks.
for (int i = 0; i < 2 * numOfMockTsBlock; i++) {
- try {
- sourceHandle.receive();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sourceHandle.receive();
if (i < 2 * numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
@@ -477,12 +463,7 @@ public class SourceHandleTest {
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
- try {
- sourceHandle.receive();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sourceHandle.receive();
if (i < numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
@@ -530,8 +511,9 @@ public class SourceHandleTest {
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
// Construct a mock client.
Client mockClient = Mockito.mock(Client.class);
+ TException mockException = new TException("Mock exception");
try {
- Mockito.doThrow(new TException("Mock exception"))
+ Mockito.doThrow(mockException)
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
} catch (TException e) {
@@ -550,7 +532,8 @@ public class SourceHandleTest {
mockClient,
mockTsBlockSerde,
mockSourceHandleListener);
- Assert.assertFalse(sourceHandle.isBlocked().isDone());
+ Future<?> blocked = sourceHandle.isBlocked();
+ Assert.assertFalse(blocked.isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -564,36 +547,20 @@ public class SourceHandleTest {
.collect(Collectors.toList()));
try {
Thread.sleep(100L);
- } catch (InterruptedException e) {
+ Mockito.verify(mockClient, Mockito.times(SourceHandle.MAX_ATTEMPT_TIMES))
+ .getDataBlock(Mockito.any());
+ } catch (InterruptedException | TException e) {
e.printStackTrace();
Assert.fail();
}
- try {
- Assert.assertFalse(sourceHandle.isBlocked().isDone());
- Assert.fail("Expect an IOException.");
- } catch (RuntimeException e) {
- Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
- }
- Assert.assertFalse(sourceHandle.isClosed());
- Assert.assertFalse(sourceHandle.isFinished());
- Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
- // The local fragment instance consumes the data blocks.
- try {
- sourceHandle.receive();
- Assert.fail("Expect an IOException.");
- } catch (IOException e) {
- Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
- }
-
- // Receive EndOfDataBlock event from upstream fragment instance.
- sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
- Assert.assertFalse(sourceHandle.isClosed());
- Assert.assertFalse(sourceHandle.isFinished());
- Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1))
+ .onFailure(sourceHandle, mockException);
+ Assert.assertFalse(blocked.isDone());
sourceHandle.close();
Assert.assertFalse(sourceHandle.isFinished());
+ Assert.assertTrue(blocked.isDone());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 49a13eddd7..ebd863d738 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.mpp.buffer;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -44,12 +44,12 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public long getBufferRetainedSizeInBytes() {
- return 0;
+ public TFragmentInstanceId getLocalFragmentInstanceId() {
+ return null;
}
@Override
- public int getNumOfBufferedTsBlocks() {
+ public long getBufferRetainedSizeInBytes() {
return 0;
}
@@ -64,7 +64,7 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public void send(int partition, List<TsBlock> tsBlocks) throws IOException {
+ public void send(int partition, List<TsBlock> tsBlocks) {
this.tsBlocks.addAll(tsBlocks);
}