You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/26 18:17:56 UTC
[pulsar] branch master updated: Fix non-atomic volatile update
(#6606)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d298c8b Fix non-atomic volatile update (#6606)
d298c8b is described below
commit d298c8b3bfe42643d2e6b5f0ba0b606bf4a1294d
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Fri Mar 27 02:17:41 2020 +0800
Fix non-atomic volatile update (#6606)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 30 ++++++++++++++++------
.../mledger/impl/ReadOnlyCursorImpl.java | 3 ++-
.../apache/pulsar/broker/service/ServerCnx.java | 7 +++--
.../PersistentDispatcherMultipleConsumers.java | 6 +++--
...istentStickyKeyDispatcherMultipleConsumers.java | 2 +-
.../client/impl/BatchMessageContainerImpl.java | 2 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 12 +++++++--
.../client/impl/PulsarServiceNameResolver.java | 5 +++-
.../util/collections/ConcurrentLongHashMap.java | 12 ++++++---
.../util/collections/ConcurrentLongPairSet.java | 7 +++--
.../util/collections/ConcurrentOpenHashMap.java | 7 +++--
.../util/collections/ConcurrentOpenHashSet.java | 9 ++++---
.../collections/GrowablePriorityLongPairQueue.java | 5 +++-
.../evictors/WatermarkCountEvictionPolicy.java | 5 +++-
14 files changed, 81 insertions(+), 31 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e9ab776..97dd1b34 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
@@ -106,7 +107,13 @@ public class ManagedCursorImpl implements ManagedCursor {
private final BookKeeper.DigestType digestType;
protected volatile PositionImpl markDeletePosition;
+
+ protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
protected volatile PositionImpl readPosition;
+
+ protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry");
protected volatile MarkDeleteEntry lastMarkDeleteEntry;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
@@ -125,6 +132,8 @@ public class ManagedCursorImpl implements ManagedCursor {
@SuppressWarnings("unused")
private volatile int pendingReadOps = 0;
+ private static final AtomicLongFieldUpdater<ManagedCursorImpl> MSG_CONSUMED_COUNTER_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ManagedCursorImpl.class, "messagesConsumedCounter");
// This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look
// at the list of ledgers in the ml. They are initialized to (-backlog) at opening, and will be incremented each
// time a message is read or deleted.
@@ -884,11 +893,11 @@ public class ManagedCursorImpl implements ManagedCursor {
PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
- messagesConsumedCounter -= getNumberOfEntries(
- Range.closedOpen(newMarkDeletePosition, markDeletePosition));
+ MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
+ Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
} else {
- messagesConsumedCounter += getNumberOfEntries(
- Range.closedOpen(markDeletePosition, newMarkDeletePosition));
+ MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
+ Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
}
markDeletePosition = newMarkDeletePosition;
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
@@ -1428,7 +1437,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] Moved ack position from: {} to: {} -- skipped: {}", ledger.getName(),
oldMarkDeletePosition, newMarkDeletePosition, skippedEntries);
}
- messagesConsumedCounter += skippedEntries;
+ MSG_CONSUMED_COUNTER_UPDATER.addAndGet(this, skippedEntries);
}
// markDelete-position and clear out deletedMsgSet
@@ -1720,7 +1729,7 @@ public class ManagedCursorImpl implements ManagedCursor {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
- ++messagesConsumedCounter;
+ MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
@@ -1766,8 +1775,9 @@ public class ManagedCursorImpl implements ManagedCursor {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
- lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, lastMarkDeleteEntry.properties, null,
- null);
+ PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
+ LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
+ last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
callback.deleteComplete(ctx);
return;
}
@@ -2666,5 +2676,9 @@ public class ManagedCursorImpl implements ManagedCursor {
|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()));
}
+ private ManagedCursorImpl cursorImpl() {
+ return this;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 8c9da1d..533b2f8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -54,7 +54,8 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCur
@Override
public void skipEntries(int numEntriesToSkip) {
log.info("[{}] Skipping {} entries on read-only cursor {}", ledger.getName(), numEntriesToSkip);
- readPosition = ledger.getPositionAfterN(readPosition, numEntriesToSkip, PositionBound.startIncluded).getNext();
+ READ_POSITION_UPDATER.getAndUpdate(this, lastRead ->
+ ledger.getPositionAfterN(lastRead, numEntriesToSkip, PositionBound.startIncluded).getNext());
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 98970e8..7732bd8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
@@ -161,6 +162,8 @@ public class ServerCnx extends PulsarHandler {
private FeatureFlags features;
// Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledPublishBufferLimiting = false;
+ private static final AtomicLongFieldUpdater<ServerCnx> MSG_PUBLISH_BUFFER_SIZE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ServerCnx.class, "messagePublishBufferSize");
private volatile long messagePublishBufferSize = 0;
enum State {
@@ -1759,7 +1762,7 @@ public class ServerCnx extends PulsarHandler {
}
public void startSendOperation(Producer producer, int msgSize) {
- messagePublishBufferSize += msgSize;
+ MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
@@ -1775,7 +1778,7 @@ public class ServerCnx extends PulsarHandler {
}
public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
- messagePublishBufferSize -= msgSize;
+ MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
if (--pendingSendRequest == resumeReadsThreshold) {
// Resume reading from socket
ctx.channel().config().setAutoRead(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9457af3..65e9a32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -88,6 +88,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
+ protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
protected volatile int totalAvailablePermits = 0;
private volatile int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
@@ -500,10 +502,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), redeliveryTracker);
- long msgSent = sendMessageInfo.getTotalMessages();
+ int msgSent = sendMessageInfo.getTotalMessages();
start += messagesForC;
entriesToDispatch -= messagesForC;
- totalAvailablePermits -= msgSent;
+ TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -msgSent);
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 7953942..abe0cf7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -116,7 +116,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
});
entriesWithSameKey.getValue().removeAll(subList);
- totalAvailablePermits -= sendMessageInfo.getTotalMessages();
+ TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -sendMessageInfo.getTotalMessages());
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index bcbd1c3..937a3e9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -88,7 +88,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
messageMetadata.setSequenceId(lowestSequenceId);
}
highestSequenceId = msg.getSequenceId();
- producer.lastSequenceIdPushed = Math.max(producer.lastSequenceIdPushed, msg.getSequenceId());
+ ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
return isBatchFull();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5b8996c..977a7b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -111,7 +111,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final CompressionCodec compressor;
+ static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUBLISHED_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(ProducerImpl.class, "lastSequenceIdPublished");
private volatile long lastSequenceIdPublished;
+
+ static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUSHED_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(ProducerImpl.class, "lastSequenceIdPushed");
protected volatile long lastSequenceIdPushed;
private volatile boolean isLastSequenceIdPotentialDuplicated;
@@ -847,7 +852,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (callback) {
op = pendingCallbacks.poll();
if (op != null) {
- lastSequenceIdPublished = Math.max(lastSequenceIdPublished, getHighestSequenceId(op));
+ OpSendMsg finalOp = op;
+ LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this,
+ last -> Math.max(last, getHighestSequenceId(finalOp)));
op.setMessageId(ledgerId, entryId, partitionIndex);
try {
// Need to protect ourselves from any exception being thrown in the future handler from the
@@ -1481,7 +1488,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
pendingMessages.put(op);
if (op.msg != null) {
- lastSequenceIdPushed = Math.max(lastSequenceIdPushed, getHighestSequenceId(op));
+ LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
+ last -> Math.max(last, getHighestSequenceId(op)));
}
ClientCnx cnx = cnx();
if (isConnected()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 443cfc5..a3f3872 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.common.net.ServiceURI;
@@ -38,6 +39,8 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
private volatile ServiceURI serviceUri;
private volatile String serviceUrl;
+ private static final AtomicIntegerFieldUpdater<PulsarServiceNameResolver> CURRENT_INDEX_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(PulsarServiceNameResolver.class, "currentIndex");
private volatile int currentIndex;
private volatile List<InetSocketAddress> addressList;
@@ -51,7 +54,7 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
if (list.size() == 1) {
return list.get(0);
} else {
- currentIndex = (currentIndex + 1) % list.size();
+ CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) % list.size());
return list.get(currentIndex);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index 14a6fda..3b3716c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock;
import java.util.function.LongFunction;
@@ -195,6 +196,9 @@ public class ConcurrentLongHashMap<V> {
private volatile V[] values;
private volatile int capacity;
+ private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
+
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -282,12 +286,12 @@ public class ConcurrentLongHashMap<V> {
if (storedKey == key) {
if (storedValue == EmptyValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
- ++size;
+ SIZE_UPDATER.incrementAndGet(this);
++usedBuckets;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
- ++size;
+ SIZE_UPDATER.incrementAndGet(this);
return valueProvider != null ? values[bucket] : null;
} else if (!onlyIfAbsent) {
// Over written an old value for same key
@@ -307,7 +311,7 @@ public class ConcurrentLongHashMap<V> {
keys[bucket] = key;
values[bucket] = value != null ? value : valueProvider.apply(key);
- ++size;
+ SIZE_UPDATER.incrementAndGet(this);
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
// The bucket contained a different deleted key
@@ -348,7 +352,7 @@ public class ConcurrentLongHashMap<V> {
return null;
}
- --size;
+ SIZE_UPDATER.decrementAndGet(this);
V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
if (nextValueInArray == EmptyValue) {
values[bucket] = (V) EmptyValue;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 5e2d499..0b94206 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock;
/**
@@ -214,6 +215,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
private volatile long[] table;
private volatile int capacity;
+ private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater
+ .newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -300,7 +303,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
table[bucket] = item1;
table[bucket + 1] = item2;
- ++size;
+ SIZE_UPDATER.incrementAndGet(this);
return true;
} else if (storedItem1 == DeletedItem) {
// The bucket contained a different deleted key
@@ -333,7 +336,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
long storedItem1 = table[bucket];
long storedItem2 = table[bucket + 1];
if (item1 == storedItem1 && item2 == storedItem2) {
- --size;
+ SIZE_UPDATER.decrementAndGet(this);
cleanBucket(bucket);
return true;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 880ef38..d61feba 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -182,6 +183,8 @@ public class ConcurrentOpenHashMap<K, V> {
private volatile Object[] table;
private volatile int capacity;
+ private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -276,7 +279,7 @@ public class ConcurrentOpenHashMap<K, V> {
table[bucket] = key;
table[bucket + 1] = value;
- ++size;
+ SIZE_UPDATER.incrementAndGet(this);
return valueProvider != null ? value : null;
} else if (storedKey == DeletedKey) {
// The bucket contained a different deleted key
@@ -310,7 +313,7 @@ public class ConcurrentOpenHashMap<K, V> {
V storedValue = (V) table[bucket + 1];
if (key.equals(storedKey)) {
if (value == null || value.equals(storedValue)) {
- --size;
+ SIZE_UPDATER.decrementAndGet(this);
int nextInArray = (bucket + 2) & (table.length - 1);
if (table[nextInArray] == EmptyKey) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 576637d..3587d8b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -177,6 +178,8 @@ public class ConcurrentOpenHashSet<V> {
private volatile V[] values;
private volatile int capacity;
+ private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -270,7 +273,7 @@ public class ConcurrentOpenHashSet<V> {
}
values[bucket] = value;
- ++size;
+ SIZE_UPDATER.incrementAndGet(this);
return true;
} else if (storedValue == DeletedValue) {
// The bucket contained a different deleted key
@@ -305,7 +308,7 @@ public class ConcurrentOpenHashSet<V> {
V storedValue = values[bucket];
if (value.equals(storedValue)) {
- --size;
+ SIZE_UPDATER.decrementAndGet(this);
cleanBucket(bucket);
return true;
} else if (storedValue == EmptyValue) {
@@ -345,7 +348,7 @@ public class ConcurrentOpenHashSet<V> {
if (storedValue != DeletedValue && storedValue != EmptyValue) {
if (filter.test(storedValue)) {
// Removing item
- --size;
+ SIZE_UPDATER.decrementAndGet(this);
++removedCount;
cleanBucket(bucket);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
index 5854892..89a35f4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* An unbounded priority queue based on a min heap where values are composed of pairs of longs.
@@ -37,6 +38,8 @@ public class GrowablePriorityLongPairQueue {
private long[] data;
private int capacity;
+ private static final AtomicIntegerFieldUpdater<GrowablePriorityLongPairQueue> SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(GrowablePriorityLongPairQueue.class, "size");
private volatile int size = 0;
private static final long EmptyItem = -1L;
@@ -204,7 +207,7 @@ public class GrowablePriorityLongPairQueue {
LongPair item = new LongPair(data[index], data[index + 1]);
data[index] = EmptyItem;
data[index + 1] = EmptyItem;
- --this.size;
+ SIZE_UPDATER.decrementAndGet(this);
int lastIndex = this.size << 1;
swap(index, lastIndex);
minHeapify(index, lastIndex - 2);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
index 9c052ef..824a0ed 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.functions.windowing.EvictionPolicy;
import org.apache.commons.lang3.tuple.Pair;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* An eviction policy that tracks count based on watermark ts and
@@ -37,6 +38,8 @@ public class WatermarkCountEvictionPolicy<T>
protected final AtomicLong currentCount;
private EvictionContext context;
+ private static final AtomicLongFieldUpdater<WatermarkCountEvictionPolicy> PROCESSED_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(WatermarkCountEvictionPolicy.class, "processed");
private volatile long processed;
public WatermarkCountEvictionPolicy(int count) {
@@ -58,7 +61,7 @@ public class WatermarkCountEvictionPolicy<T>
if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
action = doEvict(event);
if (action == Action.PROCESS) {
- ++processed;
+ PROCESSED_UPDATER.incrementAndGet(this);
}
} else {
action = Action.KEEP;