You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/07/22 00:57:34 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix the issue that Drivers of query with limit clause can not be finished timely

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 229bf689f9c [To rel/1.1] Fix the issue that Drivers of query with limit clause can not be finished timely
229bf689f9c is described below

commit 229bf689f9cff587798f2c02812eed7281c090f8
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sat Jul 22 08:57:29 2023 +0800

    [To rel/1.1] Fix the issue that Drivers of query with limit clause can not be finished timely
---
 ...TsBlockFromClosedOrAbortedChannelException.java | 26 ++++++++++++++++++++++
 .../execution/exchange/MPPDataExchangeManager.java |  5 +++++
 .../mpp/execution/exchange/sink/SinkChannel.java   |  5 +++--
 .../execution/exchange/source/SourceHandle.java    |  4 ++--
 .../scheduler/FixedRateFragInsStateTracker.java    |  7 ++++--
 5 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/exception/exchange/GetTsBlockFromClosedOrAbortedChannelException.java b/server/src/main/java/org/apache/iotdb/db/mpp/exception/exchange/GetTsBlockFromClosedOrAbortedChannelException.java
new file mode 100644
index 00000000000..cdd6b5a31ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/exception/exchange/GetTsBlockFromClosedOrAbortedChannelException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.exception.exchange;
+
+public class GetTsBlockFromClosedOrAbortedChannelException extends IllegalStateException {
+  public GetTsBlockFromClosedOrAbortedChannelException(String s) {
+    super(s);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 886205d1306..b8c0957e4ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.db.mpp.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
@@ -112,6 +113,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           try {
             ByteBuffer serializedTsBlock = sinkChannel.getSerializedTsBlock(i);
             resp.addToTsBlocks(serializedTsBlock);
+          } catch (GetTsBlockFromClosedOrAbortedChannelException e) {
+            // Return an empty block list to indicate that getting data block failed this time.
+            // The SourceHandle will deal with this signal depending on its state.
+            return new TGetDataBlockResponse(new ArrayList<>());
           } catch (IllegalStateException | IOException e) {
             throw new TException(e);
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b7d9309cf05..94522a55648 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -303,11 +304,11 @@ public class SinkChannel implements ISinkChannel {
 
   public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
     if (aborted || closed) {
-      LOGGER.warn(
+      LOGGER.debug(
           "SinkChannel still receive getting TsBlock request after being aborted={} or closed={}",
           aborted,
           closed);
-      throw new IllegalStateException("SinkChannel is aborted or closed. ");
+      throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is aborted or closed. ");
     }
     Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
     if (pair == null || pair.left == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index acc52ceae5d..70fc0895324 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -668,7 +668,7 @@ public class SourceHandle implements ISourceHandle {
     public void run() {
       try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
         LOGGER.debug(
-            "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
+            "[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
             remoteFragmentInstanceId,
             indexOfUpstreamSinkHandle);
         int attempt = 0;
@@ -683,7 +683,7 @@ public class SourceHandle implements ISourceHandle {
             break;
           } catch (Throwable e) {
             LOGGER.warn(
-                "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
+                "[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
                 remoteFragmentInstanceId,
                 indexOfUpstreamSinkHandle);
             if (attempt == MAX_ATTEMPT_TIMES) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 2f389ec1b1a..9ed993dab89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -88,8 +88,11 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     }
     for (FragmentInstanceId fragmentInstanceId : instanceIds) {
       InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId);
-      if (stateMetrics != null
-          && (stateMetrics.lastState == null || !stateMetrics.lastState.isDone())) {
+      if (stateMetrics == null
+          || stateMetrics.lastState == null
+          || !stateMetrics.lastState.isDone()) {
+        // FI whose state has not been updated is considered to be unfinished.(In Query with limit
+        // clause, it's possible that the query is finished before the state of FI being recorded.)
         res.add(fragmentInstanceId);
       }
     }