You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/24 13:36:03 UTC

[iotdb] branch master updated: Remove unecessary synchronized on decrementCnt of SinkListener

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8d009a72 Remove unecessary synchronized on decrementCnt of SinkListener
6a8d009a72 is described below

commit 6a8d009a7259fce5ca833146a8aa79b98d0b63e3
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Apr 24 21:35:57 2023 +0800

    Remove unecessary synchronized on decrementCnt of SinkListener
---
 .../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b392f7eabf..6491a1cfb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -63,6 +63,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -401,7 +402,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
     private final AtomicInteger cnt;
 
-    private volatile boolean hasDecremented = false;
+    private final AtomicBoolean hasDecremented = new AtomicBoolean(false);
 
     public ISinkChannelListenerImpl(
         TFragmentInstanceId localFragmentInstanceId,
@@ -441,9 +442,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       }
     }
 
-    private synchronized void decrementCnt() {
-      if (!hasDecremented) {
-        hasDecremented = true;
+    private void decrementCnt() {
+      if (hasDecremented.compareAndSet(false, true)) {
         if (cnt.decrementAndGet() == 0) {
           closeShuffleSinkHandle();
         }