You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/07/22 00:57:34 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix the issue that Drivers of query with limit clause can not be finished timely
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 229bf689f9c [To rel/1.1] Fix the issue that Drivers of query with limit clause can not be finished timely
229bf689f9c is described below
commit 229bf689f9cff587798f2c02812eed7281c090f8
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sat Jul 22 08:57:29 2023 +0800
[To rel/1.1] Fix the issue that Drivers of query with limit clause can not be finished timely
---
...TsBlockFromClosedOrAbortedChannelException.java | 26 ++++++++++++++++++++++
.../execution/exchange/MPPDataExchangeManager.java | 5 +++++
.../mpp/execution/exchange/sink/SinkChannel.java | 5 +++--
.../execution/exchange/source/SourceHandle.java | 4 ++--
.../scheduler/FixedRateFragInsStateTracker.java | 7 ++++--
5 files changed, 41 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/exception/exchange/GetTsBlockFromClosedOrAbortedChannelException.java b/server/src/main/java/org/apache/iotdb/db/mpp/exception/exchange/GetTsBlockFromClosedOrAbortedChannelException.java
new file mode 100644
index 00000000000..cdd6b5a31ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/exception/exchange/GetTsBlockFromClosedOrAbortedChannelException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.exception.exchange;
+
+public class GetTsBlockFromClosedOrAbortedChannelException extends IllegalStateException {
+ public GetTsBlockFromClosedOrAbortedChannelException(String s) {
+ super(s);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 886205d1306..b8c0957e4ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.db.mpp.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
@@ -112,6 +113,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
try {
ByteBuffer serializedTsBlock = sinkChannel.getSerializedTsBlock(i);
resp.addToTsBlocks(serializedTsBlock);
+ } catch (GetTsBlockFromClosedOrAbortedChannelException e) {
+ // Return an empty block list to indicate that getting data block failed this time.
+ // The SourceHandle will deal with this signal depending on its state.
+ return new TGetDataBlockResponse(new ArrayList<>());
} catch (IllegalStateException | IOException e) {
throw new TException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b7d9309cf05..94522a55648 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -303,11 +304,11 @@ public class SinkChannel implements ISinkChannel {
public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
if (aborted || closed) {
- LOGGER.warn(
+ LOGGER.debug(
"SinkChannel still receive getting TsBlock request after being aborted={} or closed={}",
aborted,
closed);
- throw new IllegalStateException("SinkChannel is aborted or closed. ");
+ throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is aborted or closed. ");
}
Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
if (pair == null || pair.left == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index acc52ceae5d..70fc0895324 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -668,7 +668,7 @@ public class SourceHandle implements ISourceHandle {
public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
LOGGER.debug(
- "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
+ "[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
remoteFragmentInstanceId,
indexOfUpstreamSinkHandle);
int attempt = 0;
@@ -683,7 +683,7 @@ public class SourceHandle implements ISourceHandle {
break;
} catch (Throwable e) {
LOGGER.warn(
- "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
+ "[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
remoteFragmentInstanceId,
indexOfUpstreamSinkHandle);
if (attempt == MAX_ATTEMPT_TIMES) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 2f389ec1b1a..9ed993dab89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -88,8 +88,11 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
}
for (FragmentInstanceId fragmentInstanceId : instanceIds) {
InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId);
- if (stateMetrics != null
- && (stateMetrics.lastState == null || !stateMetrics.lastState.isDone())) {
+ if (stateMetrics == null
+ || stateMetrics.lastState == null
+ || !stateMetrics.lastState.isDone()) {
+ // FI whose state has not been updated is considered to be unfinished.(In Query with limit
+ // clause, it's possible that the query is finished before the state of FI being recorded.)
res.add(fragmentInstanceId);
}
}