You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/13 06:27:02 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5766] Refactor listener of ISinkChannel to ensure that ShuffleSinkHandle is closed properly

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 7ccd2f8f09 [To rel/1.1][IOTDB-5766] Refactor listener of ISinkChannel to ensure that ShuffleSinkHandle is closed properly
7ccd2f8f09 is described below

commit 7ccd2f8f094711d201584c91fafc6ba35afa9b13
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Thu Apr 13 14:26:56 2023 +0800

    [To rel/1.1][IOTDB-5766] Refactor listener of ISinkChannel to ensure that ShuffleSinkHandle is closed properly
---
 .../execution/exchange/MPPDataExchangeManager.java | 106 ++++++++++++++++++---
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  23 +++--
 .../execution/exchange/sink/LocalSinkChannel.java  |   2 -
 .../execution/exchange/sink/ShuffleSinkHandle.java |  74 ++++++++------
 .../exchange/source/LocalSourceHandle.java         |   2 +-
 .../exchange/source/PipelineSourceHandle.java      |  39 ++++++++
 .../fragment/FragmentInstanceExecution.java        |   4 +
 7 files changed, 195 insertions(+), 55 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index a5370f0191..67f8bda606 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.SinkChannel;
 import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.PipelineSourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -62,6 +63,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -390,17 +392,86 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     }
   }
 
+  class ISinkChannelListenerImpl implements SinkListener {
+
+    private final TFragmentInstanceId shuffleSinkHandleId;
+
+    private final FragmentInstanceContext context;
+    private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
+
+    private final AtomicInteger cnt;
+
+    private volatile boolean hasDecremented = false;
+
+    public ISinkChannelListenerImpl(
+        TFragmentInstanceId localFragmentInstanceId,
+        FragmentInstanceContext context,
+        IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+        AtomicInteger cnt) {
+      this.shuffleSinkHandleId = localFragmentInstanceId;
+      this.context = context;
+      this.onFailureCallback = onFailureCallback;
+      this.cnt = cnt;
+    }
+
+    @Override
+    public void onFinish(ISink sink) {
+      LOGGER.debug("[SkHListenerOnFinish]");
+      decrementCnt();
+    }
+
+    @Override
+    public void onEndOfBlocks(ISink sink) {
+      LOGGER.debug("[SkHListenerOnEndOfTsBlocks]");
+    }
+
+    @Override
+    public Optional<Throwable> onAborted(ISink sink) {
+      LOGGER.debug("[SkHListenerOnAbort]");
+      decrementCnt();
+      return context.getFailureCause();
+    }
+
+    @Override
+    public void onFailure(ISink sink, Throwable t) {
+      LOGGER.warn("ISinkChannel failed due to", t);
+      decrementCnt();
+      if (onFailureCallback != null) {
+        onFailureCallback.call(t);
+      }
+    }
+
+    private synchronized void decrementCnt() {
+      if (!hasDecremented) {
+        hasDecremented = true;
+        if (cnt.decrementAndGet() == 0) {
+          closeShuffleSinkHandle();
+        }
+      }
+    }
+
+    private void closeShuffleSinkHandle() {
+      ISinkHandle sinkHandle = shuffleSinkHandles.remove(shuffleSinkHandleId);
+      if (sinkHandle != null) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Close ShuffleSinkHandle: {}", shuffleSinkHandleId);
+        }
+        sinkHandle.close();
+      }
+    }
+  }
+
   /**
    * Listen to the state changes of a sink handle of pipeline. And since the finish of pipeline sink
    * handle doesn't equal the finish of the whole fragment, therefore we don't need to notify
    * fragment context. But if it's aborted or failed, it can lead to the total fail.
    */
-  static class SinkListenerImpl implements SinkListener {
+  static class PipelineSinkListenerImpl implements SinkListener {
 
     private final FragmentInstanceContext context;
     private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
 
-    public SinkListenerImpl(
+    public PipelineSinkListenerImpl(
         FragmentInstanceContext context,
         IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
       this.context = context;
@@ -477,7 +548,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       String localPlanNodeId,
       // TODO: replace with callbacks to decouple MPPDataExchangeManager from
       // FragmentInstanceContext
-      FragmentInstanceContext instanceContext) {
+      FragmentInstanceContext instanceContext,
+      AtomicInteger cnt) {
 
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
@@ -502,7 +574,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     return new LocalSinkChannel(
         localFragmentInstanceId,
         queue,
-        new SinkListenerImpl(instanceContext, instanceContext::failed));
+        new ISinkChannelListenerImpl(
+            localFragmentInstanceId, instanceContext, instanceContext::failed, cnt));
   }
 
   /**
@@ -522,7 +595,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     queue.allowAddingTsBlock();
     return new LocalSinkChannel(
         queue,
-        new SinkListenerImpl(driverContext.getFragmentInstanceContext(), driverContext::failed));
+        new PipelineSinkListenerImpl(
+            driverContext.getFragmentInstanceContext(), driverContext::failed));
   }
 
   private ISinkChannel createSinkChannel(
@@ -533,7 +607,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       String localPlanNodeId,
       // TODO: replace with callbacks to decouple MPPDataExchangeManager from
       // FragmentInstanceContext
-      FragmentInstanceContext instanceContext) {
+      FragmentInstanceContext instanceContext,
+      AtomicInteger cnt) {
 
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
@@ -552,7 +627,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         localMemoryManager,
         executorService,
         tsBlockSerdeFactory.get(),
-        new SinkListenerImpl(instanceContext, instanceContext::failed),
+        new ISinkChannelListenerImpl(
+            localFragmentInstanceId, instanceContext, instanceContext::failed, cnt),
         mppDataExchangeServiceClientManager);
   }
 
@@ -571,6 +647,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           "ShuffleSinkHandle for " + localFragmentInstanceId + " exists.");
     }
 
+    int channelNum = downStreamChannelLocationList.size();
+    AtomicInteger cnt = new AtomicInteger(channelNum);
     List<ISinkChannel> downStreamChannelList =
         downStreamChannelLocationList.stream()
             .map(
@@ -579,7 +657,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
                         localFragmentInstanceId,
                         localPlanNodeId,
                         downStreamChannelLocation,
-                        instanceContext))
+                        instanceContext,
+                        cnt))
             .collect(Collectors.toList());
 
     ShuffleSinkHandle shuffleSinkHandle =
@@ -598,14 +677,16 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       DownStreamChannelLocation downStreamChannelLocation,
-      FragmentInstanceContext instanceContext) {
+      FragmentInstanceContext instanceContext,
+      AtomicInteger cnt) {
     if (isSameNode(downStreamChannelLocation.getRemoteEndpoint())) {
       return createLocalSinkChannel(
           localFragmentInstanceId,
           downStreamChannelLocation.getRemoteFragmentInstanceId(),
           downStreamChannelLocation.getRemotePlanNodeId(),
           localPlanNodeId,
-          instanceContext);
+          instanceContext,
+          cnt);
     } else {
       return createSinkChannel(
           localFragmentInstanceId,
@@ -613,7 +694,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           downStreamChannelLocation.getRemoteFragmentInstanceId(),
           downStreamChannelLocation.getRemotePlanNodeId(),
           localPlanNodeId,
-          instanceContext);
+          instanceContext,
+          cnt);
     }
   }
 
@@ -626,7 +708,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Create local source handle for {}", context.getDriverTaskID());
     }
-    return new LocalSourceHandle(
+    return new PipelineSourceHandle(
         queue,
         new PipelineSourceHandleListenerImpl(context::failed),
         context.getDriverTaskID().toString());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..ca1a42746b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -147,7 +147,7 @@ public class SharedTsBlockQueue {
     this.sourceHandle = sourceHandle;
   }
 
-  /** Notify no more tsblocks will be added to the queue. */
+  /** Notify no more TsBlocks will be added to the queue. */
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     LOGGER.debug("[SignalNoMoreTsBlockOnQueue]");
     if (closed) {
@@ -164,7 +164,7 @@ public class SharedTsBlockQueue {
   }
 
   /**
-   * Remove a tsblock from the head of the queue and return. Should be invoked only when the future
+   * Remove a TsBlock from the head of the queue and return. Should be invoked only when the future
    * returned by {@link #isBlocked()} completes.
    */
   public TsBlock remove() {
@@ -172,11 +172,6 @@ public class SharedTsBlockQueue {
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
-    // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
-    // corresponding LocalSinkChannel.
-    if (sinkChannel != null) {
-      sinkChannel.checkAndInvokeOnFinished();
-    }
     localMemoryManager
         .getQueryPool()
         .free(
@@ -185,6 +180,11 @@ public class SharedTsBlockQueue {
             localPlanNodeId,
             tsBlock.getRetainedSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+    // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
+    // corresponding LocalSinkChannel.
+    if (sinkChannel != null) {
+      sinkChannel.checkAndInvokeOnFinished();
+    }
     if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
       blocked = SettableFuture.create();
     }
@@ -192,17 +192,19 @@ public class SharedTsBlockQueue {
   }
 
   /**
-   * Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
+   * Add TsBlocks to the queue. Except the first invocation, this method should be invoked only when
    * the returned future of last invocation completes.
    */
   public ListenableFuture<Void> add(TsBlock tsBlock) {
     if (closed) {
-      LOGGER.warn("queue has been destroyed");
+      // queue may have been closed
       return immediateVoidFuture();
     }
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
-    Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+
+    Validate.isTrue(
+        blockedOnMemory == null || blockedOnMemory.isDone(), "SharedTsBlockQueue is full");
     Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
@@ -264,6 +266,7 @@ public class SharedTsBlockQueue {
         .getQueryPool()
         .clearMemoryReservationMap(
             localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
+    sinkChannel.close();
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index afd043c9af..0e434aae80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -194,8 +194,6 @@ public class LocalSinkChannel implements ISinkChannel {
   private void checkState() {
     if (aborted) {
       throw new IllegalStateException("LocalSinkChannel is aborted.");
-    } else if (closed) {
-      throw new IllegalStateException("LocalSinkChannel is closed.");
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index a4d3f7c198..b1e155b22f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -26,14 +26,14 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
 
@@ -48,8 +48,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   private final boolean[] channelOpened;
 
-  private final Set<Integer> closedChannel = Sets.newConcurrentHashSet();
-
   private final DownStreamChannelIndex downStreamChannelIndex;
 
   private final int channelNum;
@@ -62,12 +60,14 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   private final MPPDataExchangeManager.SinkListener sinkListener;
 
-  private boolean aborted = false;
+  private volatile boolean aborted = false;
 
-  private boolean closed = false;
+  private volatile boolean closed = false;
 
   private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
+  private final Lock lock = new ReentrantLock();
+
   /** max bytes this ShuffleSinkHandle can reserve. */
   private long maxBytesCanReserve =
       IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
@@ -126,27 +126,45 @@ public class ShuffleSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public synchronized void setNoMoreTsBlocks() {
-    for (int i = 0; i < downStreamChannelList.size(); i++) {
-      if (!hasSetNoMoreTsBlocks[i]) {
-        downStreamChannelList.get(i).setNoMoreTsBlocks();
-        hasSetNoMoreTsBlocks[i] = true;
+  public void setNoMoreTsBlocks() {
+    if (closed || aborted) {
+      return;
+    }
+    try {
+      lock.lock();
+      for (int i = 0; i < downStreamChannelList.size(); i++) {
+        if (!hasSetNoMoreTsBlocks[i]) {
+          downStreamChannelList.get(i).setNoMoreTsBlocks();
+          hasSetNoMoreTsBlocks[i] = true;
+        }
       }
+      sinkListener.onEndOfBlocks(this);
+    } finally {
+      lock.unlock();
     }
-    sinkListener.onEndOfBlocks(this);
   }
 
   @Override
-  public synchronized void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
-    if (!hasSetNoMoreTsBlocks[channelIndex]) {
-      downStreamChannelList.get(channelIndex).setNoMoreTsBlocks();
-      hasSetNoMoreTsBlocks[channelIndex] = true;
+  public void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
+    if (closed || aborted) {
+      // if this ShuffleSinkHandle has been closed, Driver.close() will attempt to setNoMoreTsBlocks
+      // for all the channels
+      return;
+    }
+    try {
+      lock.lock();
+      if (!hasSetNoMoreTsBlocks[channelIndex]) {
+        downStreamChannelList.get(channelIndex).setNoMoreTsBlocks();
+        hasSetNoMoreTsBlocks[channelIndex] = true;
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
   @Override
   public boolean isClosed() {
-    return closedChannel.size() == downStreamChannelList.size();
+    return closed;
   }
 
   @Override
@@ -165,10 +183,11 @@ public class ShuffleSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public synchronized void abort() {
+  public void abort() {
     if (aborted || closed) {
       return;
     }
+    aborted = true;
     LOGGER.debug("[StartAbortShuffleSinkHandle]");
     boolean meetError = false;
     Exception firstException = null;
@@ -185,16 +204,20 @@ public class ShuffleSinkHandle implements ISinkHandle {
     if (meetError) {
       LOGGER.warn("Error occurred when try to abort channel.", firstException);
     }
-    aborted = true;
     sinkListener.onAborted(this);
     LOGGER.debug("[EndAbortShuffleSinkHandle]");
   }
 
+  // Add synchronized on this method may lead to Dead Lock
+  // It is possible that when LocalSinkChannel revokes this close method and try to get Lock
+  // ShuffleSinkHandle while synchronized methods of ShuffleSinkHandle
+  // Lock ShuffleSinkHandle and wait to lock LocalSinkChannel
   @Override
-  public synchronized void close() {
+  public void close() {
     if (closed || aborted) {
       return;
     }
+    closed = true;
     LOGGER.debug("[StartCloseShuffleSinkHandle]");
     boolean meetError = false;
     Exception firstException = null;
@@ -211,7 +234,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
     if (meetError) {
       LOGGER.warn("Error occurred when try to close channel.", firstException);
     }
-    closed = true;
     sinkListener.onFinish(this);
     LOGGER.debug("[EndCloseShuffleSinkHandle]");
   }
@@ -244,15 +266,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public boolean isChannelClosed(int index) {
-    if (closedChannel.contains(index)) {
-      return true;
-    } else {
-      if (downStreamChannelList.get(index).isClosed()) {
-        closedChannel.add(index);
-        return true;
-      }
-      return false;
-    }
+    return downStreamChannelList.get(index).isClosed();
   }
 
   // region ============ Shuffle Related ============
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 7dc6ad2983..9ffe6e0948 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -48,7 +48,7 @@ public class LocalSourceHandle implements ISourceHandle {
   private TFragmentInstanceId localFragmentInstanceId;
   private String localPlanNodeId;
   private final SourceHandleListener sourceHandleListener;
-  private final SharedTsBlockQueue queue;
+  protected final SharedTsBlockQueue queue;
   private boolean aborted = false;
 
   private boolean closed = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/PipelineSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/PipelineSourceHandle.java
new file mode 100644
index 0000000000..e4b408cd9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/PipelineSourceHandle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.exchange.source;
+
+import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
+import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
+
+public class PipelineSourceHandle extends LocalSourceHandle {
+
+  public PipelineSourceHandle(
+      SharedTsBlockQueue queue,
+      MPPDataExchangeManager.SourceHandleListener sourceHandleListener,
+      String threadName) {
+    super(queue, sourceHandleListener, threadName);
+  }
+
+  @Override
+  public void setMaxBytesCanReserve(long maxBytesCanReserve) {
+    if (maxBytesCanReserve < queue.getMaxBytesCanReserve()) {
+      queue.setMaxBytesCanReserve(maxBytesCanReserve);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 22377fc9ad..765837b33e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -117,6 +117,10 @@ public class FragmentInstanceExecution {
               return;
             }
 
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug("Enter the stateChangeListener");
+            }
+
             // Update failed tasks counter
             if (newState == FAILED) {
               failedInstances.update(1);