You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 12:53:30 UTC
[iotdb] 01/01: fix the issue that some FI stuck in FLUSHING states
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_instance_flushing
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cbc42948a0ed01f5db8e1f5de7e301b2d4d02dbf
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 5 20:53:16 2022 +0800
fix the issue that some FI stuck in FLUSHING states
---
.../db/mpp/execution/datatransfer/DataBlockManager.java | 15 +++++++++------
.../db/mpp/execution/datatransfer/SourceHandle.java | 4 ++--
.../iotdb/db/mpp/plan/execution/QueryExecution.java | 4 +++-
.../db/mpp/plan/scheduler/SimpleQueryTerminator.java | 17 +++++++++++++----
.../db/service/thrift/impl/InternalServiceImpl.java | 12 ++++++++----
thrift/src/main/thrift/mpp.thrift | 1 +
6 files changed, 36 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
index c889f18f6d..b6674ee7f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
@@ -106,13 +106,16 @@ public class DataBlockManager implements IDataBlockManager {
e.getEndSequenceId(),
e.getSourceFragmentInstanceId());
if (!sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
- throw new TException(
- "Source fragment instance not found. Fragment instance ID: "
- + e.getSourceFragmentInstanceId()
- + ".");
+ // In some scenario, when the SourceHandle sends the data block ACK event, its upstream may
+ // have already been stopped. For example, in the query whit LimitOperator, the downstream
+ // FragmentInstance may be finished, although the upstream is still working.
+ logger.warn(
+ "received ACK event but target fragment[{}] instance not found.",
+ e.getSourceFragmentInstanceId());
+ } else {
+ ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
+ .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
}
- ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
- .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 1c8bf70880..2102e98bfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -320,6 +320,8 @@ public class SourceHandle implements ISourceHandle {
TsBlock tsBlock = serde.deserialize(byteBuffer);
tsBlocks.add(tsBlock);
}
+ executorService.submit(
+ new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
synchronized (SourceHandle.this) {
if (aborted) {
return;
@@ -331,8 +333,6 @@ public class SourceHandle implements ISourceHandle {
blocked.set(null);
}
}
- executorService.submit(
- new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
break;
} catch (Throwable e) {
logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 763ca26a82..7f6078087f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -238,7 +238,9 @@ public class QueryExecution implements IQueryExecution {
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
if (resultHandle.isFinished()) {
- releaseResource();
+ // Once the resultHandle is finished, we should transit the state of this query to FINISHED.
+ // So that the corresponding cleanup work could be triggered.
+ stateMachine.transitionToFinished();
return Optional.empty();
}
return Optional.of(resultHandle.receive());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index df6f08f7c2..14334733ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -58,7 +59,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
@Override
public Future<Boolean> terminate() {
- List<TEndPoint> relatedHost = getRelatedHost(fragmentInstances);
+ List<TEndPoint> relatedHost = getRelatedHost();
return executor.submit(
() -> {
@@ -66,7 +67,8 @@ public class SimpleQueryTerminator implements IQueryTerminator {
// TODO (jackie tien) change the port
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+ client.cancelQuery(
+ new TCancelQueryReq(queryId.getId(), getRelatedFragmentInstances(endPoint)));
} catch (IOException e) {
LOGGER.error("can't connect to node {}", endPoint, e);
return false;
@@ -78,10 +80,17 @@ public class SimpleQueryTerminator implements IQueryTerminator {
});
}
- private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
- return instances.stream()
+ private List<TEndPoint> getRelatedHost() {
+ return fragmentInstances.stream()
.map(instance -> instance.getHostDataNode().internalEndPoint)
.distinct()
.collect(Collectors.toList());
}
+
+ private List<TFragmentInstanceId> getRelatedFragmentInstances(TEndPoint endPoint) {
+ return fragmentInstances.stream()
+ .filter(instance -> instance.getHostDataNode().internalEndPoint.equals(endPoint))
+ .map(instance -> instance.getId().toThrift())
+ .collect(Collectors.toList());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index f8b5778611..e2a1fff0c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -72,6 +72,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class InternalServiceImpl implements InternalService.Iface {
@@ -132,11 +133,14 @@ public class InternalServiceImpl implements InternalService.Iface {
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-
- // TODO need to be implemented and currently in order not to print NotImplementedException log,
- // we simply return null
+ List<FragmentInstanceId> taskIds =
+ req.getFragmentInstanceIds().stream()
+ .map(FragmentInstanceId::fromThrift)
+ .collect(Collectors.toList());
+ for (FragmentInstanceId taskId : taskIds) {
+ FragmentInstanceManager.getInstance().cancelTask(taskId);
+ }
return new TCancelResp(true);
- // throw new NotImplementedException();
}
@Override
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index 0c2acf6cb2..1d33d58cc4 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -105,6 +105,7 @@ struct TFragmentInstanceStateResp {
struct TCancelQueryReq {
1: required string queryId
+ 2: required list<TFragmentInstanceId> fragmentInstanceIds
}
struct TCancelPlanFragmentReq {