You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/26 18:17:56 UTC

[pulsar] branch master updated: Fix non-atomic volatile update (#6606)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d298c8b  Fix non-atomic volatile update (#6606)
d298c8b is described below

commit d298c8b3bfe42643d2e6b5f0ba0b606bf4a1294d
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Fri Mar 27 02:17:41 2020 +0800

    Fix non-atomic volatile update (#6606)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 30 ++++++++++++++++------
 .../mledger/impl/ReadOnlyCursorImpl.java           |  3 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    |  7 +++--
 .../PersistentDispatcherMultipleConsumers.java     |  6 +++--
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 .../client/impl/BatchMessageContainerImpl.java     |  2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 12 +++++++--
 .../client/impl/PulsarServiceNameResolver.java     |  5 +++-
 .../util/collections/ConcurrentLongHashMap.java    | 12 ++++++---
 .../util/collections/ConcurrentLongPairSet.java    |  7 +++--
 .../util/collections/ConcurrentOpenHashMap.java    |  7 +++--
 .../util/collections/ConcurrentOpenHashSet.java    |  9 ++++---
 .../collections/GrowablePriorityLongPairQueue.java |  5 +++-
 .../evictors/WatermarkCountEvictionPolicy.java     |  5 +++-
 14 files changed, 81 insertions(+), 31 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e9ab776..97dd1b34 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -106,7 +107,13 @@ public class ManagedCursorImpl implements ManagedCursor {
     private final BookKeeper.DigestType digestType;
 
     protected volatile PositionImpl markDeletePosition;
+
+    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
     protected volatile PositionImpl readPosition;
+
+    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry");
     protected volatile MarkDeleteEntry lastMarkDeleteEntry;
 
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
@@ -125,6 +132,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     @SuppressWarnings("unused")
     private volatile int pendingReadOps = 0;
 
+    private static final AtomicLongFieldUpdater<ManagedCursorImpl> MSG_CONSUMED_COUNTER_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(ManagedCursorImpl.class, "messagesConsumedCounter");
     // This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look
     // at the list of ledgers in the ml. They are initialized to (-backlog) at opening, and will be incremented each
     // time a message is read or deleted.
@@ -884,11 +893,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                     PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);
 
                     if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
-                        messagesConsumedCounter -= getNumberOfEntries(
-                                Range.closedOpen(newMarkDeletePosition, markDeletePosition));
+                        MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
+                                Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
                     } else {
-                        messagesConsumedCounter += getNumberOfEntries(
-                                Range.closedOpen(markDeletePosition, newMarkDeletePosition));
+                        MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
+                                Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
                     lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
@@ -1428,7 +1437,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.debug("[{}] Moved ack position from: {} to: {} -- skipped: {}", ledger.getName(),
                         oldMarkDeletePosition, newMarkDeletePosition, skippedEntries);
             }
-            messagesConsumedCounter += skippedEntries;
+            MSG_CONSUMED_COUNTER_UPDATER.addAndGet(this, skippedEntries);
         }
 
         // markDelete-position and clear out deletedMsgSet
@@ -1720,7 +1729,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 PositionImpl previousPosition = ledger.getPreviousPosition(position);
                 individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
                         position.getLedgerId(), position.getEntryId());
-                ++messagesConsumedCounter;
+                MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
@@ -1766,8 +1775,9 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
-            lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, lastMarkDeleteEntry.properties, null,
-                    null);
+            PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
+            LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
+                    last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
             callback.deleteComplete(ctx);
             return;
         }
@@ -2666,5 +2676,9 @@ public class ManagedCursorImpl implements ManagedCursor {
                 || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()));
     }
 
+    private ManagedCursorImpl cursorImpl() {
+        return this;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 8c9da1d..533b2f8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -54,7 +54,8 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCur
     @Override
     public void skipEntries(int numEntriesToSkip) {
         log.info("[{}] Skipping {} entries on read-only cursor {}", ledger.getName(), numEntriesToSkip);
-        readPosition = ledger.getPositionAfterN(readPosition, numEntriesToSkip, PositionBound.startIncluded).getNext();
+        READ_POSITION_UPDATER.getAndUpdate(this, lastRead ->
+                ledger.getPositionAfterN(lastRead, numEntriesToSkip, PositionBound.startIncluded).getNext());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 98970e8..7732bd8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.stream.Collectors;
 
 import javax.naming.AuthenticationException;
@@ -161,6 +162,8 @@ public class ServerCnx extends PulsarHandler {
     private FeatureFlags features;
     // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
     private volatile boolean autoReadDisabledPublishBufferLimiting = false;
+    private static final AtomicLongFieldUpdater<ServerCnx> MSG_PUBLISH_BUFFER_SIZE_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(ServerCnx.class, "messagePublishBufferSize");
     private volatile long messagePublishBufferSize = 0;
 
     enum State {
@@ -1759,7 +1762,7 @@ public class ServerCnx extends PulsarHandler {
     }
 
     public void startSendOperation(Producer producer, int msgSize) {
-        messagePublishBufferSize += msgSize;
+        MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
         boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
         if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
             // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
@@ -1775,7 +1778,7 @@ public class ServerCnx extends PulsarHandler {
     }
 
     public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
-        messagePublishBufferSize -= msgSize;
+        MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
         if (--pendingSendRequest == resumeReadsThreshold) {
             // Resume reading from socket
             ctx.channel().config().setAutoRead(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9457af3..65e9a32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -88,6 +88,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     private boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
 
+    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
     protected volatile int totalAvailablePermits = 0;
     private volatile int readBatchSize;
     private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
@@ -500,10 +502,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), redeliveryTracker);
 
-                long msgSent = sendMessageInfo.getTotalMessages();
+                int msgSent = sendMessageInfo.getTotalMessages();
                 start += messagesForC;
                 entriesToDispatch -= messagesForC;
-                totalAvailablePermits -= msgSent;
+                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -msgSent);
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 7953942..abe0cf7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -116,7 +116,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 });
                 entriesWithSameKey.getValue().removeAll(subList);
 
-                totalAvailablePermits -= sendMessageInfo.getTotalMessages();
+                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -sendMessageInfo.getTotalMessages());
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index bcbd1c3..937a3e9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -88,7 +88,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             messageMetadata.setSequenceId(lowestSequenceId);
         }
         highestSequenceId = msg.getSequenceId();
-        producer.lastSequenceIdPushed = Math.max(producer.lastSequenceIdPushed, msg.getSequenceId());
+        ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
 
         return isBatchFull();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5b8996c..977a7b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -111,7 +111,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private final CompressionCodec compressor;
 
+    static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUBLISHED_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ProducerImpl.class, "lastSequenceIdPublished");
     private volatile long lastSequenceIdPublished;
+
+    static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUSHED_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ProducerImpl.class, "lastSequenceIdPushed");
     protected volatile long lastSequenceIdPushed;
     private volatile boolean isLastSequenceIdPotentialDuplicated;
 
@@ -847,7 +852,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (callback) {
             op = pendingCallbacks.poll();
             if (op != null) {
-                lastSequenceIdPublished = Math.max(lastSequenceIdPublished, getHighestSequenceId(op));
+                OpSendMsg finalOp = op;
+                LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this,
+                        last -> Math.max(last, getHighestSequenceId(finalOp)));
                 op.setMessageId(ledgerId, entryId, partitionIndex);
                 try {
                     // Need to protect ourselves from any exception being thrown in the future handler from the
@@ -1481,7 +1488,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
             pendingMessages.put(op);
             if (op.msg != null) {
-                lastSequenceIdPushed = Math.max(lastSequenceIdPushed, getHighestSequenceId(op));
+                LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
+                        last -> Math.max(last, getHighestSequenceId(op)));
             }
             ClientCnx cnx = cnx();
             if (isConnected()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 443cfc5..a3f3872 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.common.net.ServiceURI;
@@ -38,6 +39,8 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
 
     private volatile ServiceURI serviceUri;
     private volatile String serviceUrl;
+    private static final AtomicIntegerFieldUpdater<PulsarServiceNameResolver> CURRENT_INDEX_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PulsarServiceNameResolver.class, "currentIndex");
     private volatile int currentIndex;
     private volatile List<InetSocketAddress> addressList;
 
@@ -51,7 +54,7 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
         if (list.size() == 1) {
             return list.get(0);
         } else {
-            currentIndex = (currentIndex + 1) % list.size();
+            CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) % list.size());
             return list.get(currentIndex);
 
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index 14a6fda..3b3716c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Lists;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.LongFunction;
 
@@ -195,6 +196,9 @@ public class ConcurrentLongHashMap<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+                AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
+
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -282,12 +286,12 @@ public class ConcurrentLongHashMap<V> {
                     if (storedKey == key) {
                         if (storedValue == EmptyValue) {
                             values[bucket] = value != null ? value : valueProvider.apply(key);
-                            ++size;
+                            SIZE_UPDATER.incrementAndGet(this);
                             ++usedBuckets;
                             return valueProvider != null ? values[bucket] : null;
                         } else if (storedValue == DeletedValue) {
                             values[bucket] = value != null ? value : valueProvider.apply(key);
-                            ++size;
+                            SIZE_UPDATER.incrementAndGet(this);
                             return valueProvider != null ? values[bucket] : null;
                         } else if (!onlyIfAbsent) {
                             // Over written an old value for same key
@@ -307,7 +311,7 @@ public class ConcurrentLongHashMap<V> {
 
                         keys[bucket] = key;
                         values[bucket] = value != null ? value : valueProvider.apply(key);
-                        ++size;
+                        SIZE_UPDATER.incrementAndGet(this);
                         return valueProvider != null ? values[bucket] : null;
                     } else if (storedValue == DeletedValue) {
                         // The bucket contained a different deleted key
@@ -348,7 +352,7 @@ public class ConcurrentLongHashMap<V> {
                                 return null;
                             }
 
-                            --size;
+                            SIZE_UPDATER.decrementAndGet(this);
                             V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
                             if (nextValueInArray == EmptyValue) {
                                 values[bucket] = (V) EmptyValue;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 5e2d499..0b94206 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.StampedLock;
 
 /**
@@ -214,6 +215,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
         private volatile long[] table;
 
         private volatile int capacity;
+        private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater
+                .newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -300,7 +303,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
 
                         table[bucket] = item1;
                         table[bucket + 1] = item2;
-                        ++size;
+                        SIZE_UPDATER.incrementAndGet(this);
                         return true;
                     } else if (storedItem1 == DeletedItem) {
                         // The bucket contained a different deleted key
@@ -333,7 +336,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     long storedItem1 = table[bucket];
                     long storedItem2 = table[bucket + 1];
                     if (item1 == storedItem1 && item2 == storedItem2) {
-                        --size;
+                        SIZE_UPDATER.decrementAndGet(this);
 
                         cleanBucket(bucket);
                         return true;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 880ef38..d61feba 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Lists;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -182,6 +183,8 @@ public class ConcurrentOpenHashMap<K, V> {
         private volatile Object[] table;
 
         private volatile int capacity;
+        private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+                AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -276,7 +279,7 @@ public class ConcurrentOpenHashMap<K, V> {
 
                         table[bucket] = key;
                         table[bucket + 1] = value;
-                        ++size;
+                        SIZE_UPDATER.incrementAndGet(this);
                         return valueProvider != null ? value : null;
                     } else if (storedKey == DeletedKey) {
                         // The bucket contained a different deleted key
@@ -310,7 +313,7 @@ public class ConcurrentOpenHashMap<K, V> {
                     V storedValue = (V) table[bucket + 1];
                     if (key.equals(storedKey)) {
                         if (value == null || value.equals(storedValue)) {
-                            --size;
+                            SIZE_UPDATER.decrementAndGet(this);
 
                             int nextInArray = (bucket + 2) & (table.length - 1);
                             if (table[nextInArray] == EmptyKey) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 576637d..3587d8b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -177,6 +178,8 @@ public class ConcurrentOpenHashSet<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+                AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -270,7 +273,7 @@ public class ConcurrentOpenHashSet<V> {
                         }
 
                         values[bucket] = value;
-                        ++size;
+                        SIZE_UPDATER.incrementAndGet(this);
                         return true;
                     } else if (storedValue == DeletedValue) {
                         // The bucket contained a different deleted key
@@ -305,7 +308,7 @@ public class ConcurrentOpenHashSet<V> {
 
                     V storedValue = values[bucket];
                     if (value.equals(storedValue)) {
-                        --size;
+                        SIZE_UPDATER.decrementAndGet(this);
                         cleanBucket(bucket);
                         return true;
                     } else if (storedValue == EmptyValue) {
@@ -345,7 +348,7 @@ public class ConcurrentOpenHashSet<V> {
                     if (storedValue != DeletedValue && storedValue != EmptyValue) {
                         if (filter.test(storedValue)) {
                             // Removing item
-                            --size;
+                            SIZE_UPDATER.decrementAndGet(this);
                             ++removedCount;
                             cleanBucket(bucket);
                         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
index 5854892..89a35f4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
  * An unbounded priority queue based on a min heap where values are composed of pairs of longs.
@@ -37,6 +38,8 @@ public class GrowablePriorityLongPairQueue {
 
     private long[] data;
     private int capacity;
+    private static final AtomicIntegerFieldUpdater<GrowablePriorityLongPairQueue> SIZE_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(GrowablePriorityLongPairQueue.class, "size");
     private volatile int size = 0;
     private static final long EmptyItem = -1L;
 
@@ -204,7 +207,7 @@ public class GrowablePriorityLongPairQueue {
             LongPair item = new LongPair(data[index], data[index + 1]);
             data[index] = EmptyItem;
             data[index + 1] = EmptyItem;
-            --this.size;
+            SIZE_UPDATER.decrementAndGet(this);
             int lastIndex = this.size << 1;
             swap(index, lastIndex);
             minHeapify(index, lastIndex - 2);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
index 9c052ef..824a0ed 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.functions.windowing.EvictionPolicy;
 import org.apache.commons.lang3.tuple.Pair;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 /**
  * An eviction policy that tracks count based on watermark ts and
@@ -37,6 +38,8 @@ public class WatermarkCountEvictionPolicy<T>
     protected final AtomicLong currentCount;
     private EvictionContext context;
 
+    private static final AtomicLongFieldUpdater<WatermarkCountEvictionPolicy> PROCESSED_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(WatermarkCountEvictionPolicy.class, "processed");
     private volatile long processed;
 
     public WatermarkCountEvictionPolicy(int count) {
@@ -58,7 +61,7 @@ public class WatermarkCountEvictionPolicy<T>
         if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
             action = doEvict(event);
             if (action == Action.PROCESS) {
-                ++processed;
+                PROCESSED_UPDATER.incrementAndGet(this);
             }
         } else {
             action = Action.KEEP;