You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/24 13:36:03 UTC
[iotdb] branch master updated: Remove unecessary synchronized on decrementCnt of SinkListener
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8d009a72 Remove unecessary synchronized on decrementCnt of SinkListener
6a8d009a72 is described below
commit 6a8d009a7259fce5ca833146a8aa79b98d0b63e3
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Apr 24 21:35:57 2023 +0800
Remove unecessary synchronized on decrementCnt of SinkListener
---
.../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b392f7eabf..6491a1cfb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -63,6 +63,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -401,7 +402,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
private final AtomicInteger cnt;
- private volatile boolean hasDecremented = false;
+ private final AtomicBoolean hasDecremented = new AtomicBoolean(false);
public ISinkChannelListenerImpl(
TFragmentInstanceId localFragmentInstanceId,
@@ -441,9 +442,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
}
- private synchronized void decrementCnt() {
- if (!hasDecremented) {
- hasDecremented = true;
+ private void decrementCnt() {
+ if (hasDecremented.compareAndSet(false, true)) {
if (cnt.decrementAndGet() == 0) {
closeShuffleSinkHandle();
}