You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/29 05:17:11 UTC

[GitHub] sijie closed pull request #1299: [dlog] use Atomic***FieldUpdater and LongAdder if possible

sijie closed pull request #1299: [dlog] use Atomic***FieldUpdater and LongAdder if possible
URL: https://github.com/apache/bookkeeper/pull/1299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
index 2c89d6442..d8c72af24 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
@@ -20,7 +20,7 @@
 import com.google.common.base.Ticker;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
@@ -30,7 +30,7 @@
 
     private static final long NANOS_PER_SEC = TimeUnit.SECONDS.toNanos(1);
 
-    private final AtomicLong total;
+    private final LongAdder total;
     private final Ticker ticker;
     private final double scaleFactor;
     private final LinkedBlockingDeque<Pair<Long, Long>> samples;
@@ -45,7 +45,7 @@ public SampledMovingAverageRate(int intervalSecs) {
                              double scaleFactor,
                              Ticker ticker) {
         this.value = 0;
-        this.total = new AtomicLong(0);
+        this.total = new LongAdder();
         this.scaleFactor = scaleFactor;
         this.ticker = ticker;
         this.samples = new LinkedBlockingDeque<>(intervalSecs);
@@ -58,7 +58,7 @@ public double get() {
 
     @Override
     public void add(long amount) {
-        total.getAndAdd(amount);
+        total.add(amount);
     }
 
     @Override
@@ -71,7 +71,7 @@ void sample() {
     }
 
     private double doSample() {
-        long newSample = total.get();
+        long newSample = total.sum();
         long newTimestamp = ticker.read();
 
         double rate = 0;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 337389633..6adcbc420 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -28,8 +28,8 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -77,12 +77,16 @@
     private final String streamName;
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler readHandler;
-    private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
+    private static final AtomicReferenceFieldUpdater<BKAsyncLogReader, Throwable> lastExceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(BKAsyncLogReader.class, Throwable.class, "lastException");
+    private volatile Throwable lastException = null;
     private final OrderedScheduler scheduler;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests =
             new ConcurrentLinkedQueue<PendingReadRequest>();
     private final Object scheduleLock = new Object();
-    private final AtomicLong scheduleCount = new AtomicLong(0);
+    private static final AtomicLongFieldUpdater<BKAsyncLogReader> scheduleCountUpdater =
+        AtomicLongFieldUpdater.newUpdater(BKAsyncLogReader.class, "scheduleCount");
+    private volatile long scheduleCount = 0L;
     private final Stopwatch scheduleDelayStopwatch;
     private final Stopwatch readNextDelayStopwatch;
     private DLSN startDLSN;
@@ -340,7 +344,7 @@ public synchronized DLSN getStartDLSN() {
     }
 
     private boolean checkClosedOrInError(String operation) {
-        if (null == lastException.get()) {
+        if (null == lastExceptionUpdater.get(this)) {
             try {
                 if (null != readHandler && null != getReadAheadReader()) {
                     getReadAheadReader().checkLastException();
@@ -360,9 +364,10 @@ private boolean checkClosedOrInError(String operation) {
             }
         }
 
-        if (null != lastException.get()) {
+        Throwable cause = lastExceptionUpdater.get(this);
+        if (null != cause) {
             LOG.trace("Cancelling pending reads");
-            cancelAllPendingReads(lastException.get());
+            cancelAllPendingReads(cause);
             return true;
         }
 
@@ -370,7 +375,7 @@ private boolean checkClosedOrInError(String operation) {
     }
 
     private void setLastException(IOException exc) {
-        lastException.compareAndSet(null, exc);
+        lastExceptionUpdater.compareAndSet(this, null, exc);
     }
 
     @Override
@@ -446,7 +451,7 @@ public void onFailure(Throwable cause) {
         }
 
         if (checkClosedOrInError("readNext")) {
-            readRequest.completeExceptionally(lastException.get());
+            readRequest.completeExceptionally(lastExceptionUpdater.get(this));
         } else {
             boolean queueEmpty = pendingRequests.isEmpty();
             pendingRequests.add(readRequest);
@@ -469,7 +474,7 @@ public synchronized void scheduleBackgroundRead() {
             return;
         }
 
-        long prevCount = scheduleCount.getAndIncrement();
+        long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
             scheduleDelayStopwatch.reset().start();
             scheduler.submitOrdered(streamName, this);
@@ -574,7 +579,7 @@ public void safeRun() {
 
             Stopwatch runTime = Stopwatch.createStarted();
             int iterations = 0;
-            long scheduleCountLocal = scheduleCount.get();
+            long scheduleCountLocal = scheduleCountUpdater.get(this);
             LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
             while (true) {
                 if (LOG.isTraceEnabled()) {
@@ -588,7 +593,7 @@ public void safeRun() {
                     // Queue is empty, nothing to read, return
                     if (null == nextRequest) {
                         LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
-                        scheduleCount.set(0);
+                        scheduleCountUpdater.set(this, 0);
                         backgroundReaderRunTime.registerSuccessfulEvent(
                             runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                         return;
@@ -605,7 +610,7 @@ public void safeRun() {
                 // If the oldest pending promise is interrupted then we must mark
                 // the reader in error and abort all pending reads since we dont
                 // know the last consumed read
-                if (null == lastException.get()) {
+                if (null == lastExceptionUpdater.get(this)) {
                     if (nextRequest.getPromise().isCancelled()) {
                         setLastException(new DLInterruptedException("Interrupted on reading "
                             + readHandler.getFullyQualifiedName()));
@@ -613,8 +618,9 @@ public void safeRun() {
                 }
 
                 if (checkClosedOrInError("readNext")) {
-                    if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
-                        LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
+                    Throwable lastException = lastExceptionUpdater.get(this);
+                    if (lastException != null && !(lastException.getCause() instanceof LogNotFoundException)) {
+                        LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException);
                     }
                     backgroundReaderRunTime.registerFailedEvent(
                         runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
@@ -659,7 +665,7 @@ public void safeRun() {
                     setLastException(exc);
                     if (!(exc instanceof LogNotFoundException)) {
                         LOG.warn("{} : read with skip Exception",
-                                readHandler.getFullyQualifiedName(), lastException.get());
+                                readHandler.getFullyQualifiedName(), lastExceptionUpdater.get(this));
                     }
                     continue;
                 }
@@ -670,7 +676,7 @@ public void safeRun() {
                         backgroundReaderRunTime.registerSuccessfulEvent(
                             runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                         scheduleDelayStopwatch.reset().start();
-                        scheduleCount.set(0);
+                        scheduleCountUpdater.set(this, 0);
                         // the request could still wait for more records
                         backgroundScheduleTask = scheduler.scheduleOrdered(
                                 streamName,
@@ -702,12 +708,12 @@ public void safeRun() {
                     }
                 } else {
                     if (0 == scheduleCountLocal) {
-                        LOG.trace("Schedule count dropping to zero", lastException.get());
+                        LOG.trace("Schedule count dropping to zero", lastExceptionUpdater.get(this));
                         backgroundReaderRunTime.registerSuccessfulEvent(
                             runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                         return;
                     }
-                    scheduleCountLocal = scheduleCount.decrementAndGet();
+                    scheduleCountLocal = scheduleCountUpdater.decrementAndGet(this);
                 }
             }
         }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index afcd7bb57..a319d44a7 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -31,7 +31,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -97,7 +97,9 @@
     protected final AlertStatsLogger alertStatsLogger;
     protected volatile boolean reportGetSegmentStats = false;
     private final String lockClientId;
-    protected final AtomicReference<IOException> metadataException = new AtomicReference<IOException>(null);
+    protected static final AtomicReferenceFieldUpdater<BKLogHandler, IOException> METADATA_EXCEPTION_UPDATER =
+        AtomicReferenceFieldUpdater.newUpdater(BKLogHandler.class, IOException.class, "metadataException");
+    private volatile IOException metadataException = null;
 
     // Maintain the list of log segments per stream
     protected final PerStreamLogSegmentCache logSegmentCache;
@@ -155,8 +157,9 @@
     }
 
     BKLogHandler checkMetadataException() throws IOException {
-        if (null != metadataException.get()) {
-            throw metadataException.get();
+        IOException ioe = METADATA_EXCEPTION_UPDATER.get(this);
+        if (null != ioe) {
+            throw ioe;
         }
         return this;
     }
@@ -480,7 +483,7 @@ public String getFullyQualifiedName() {
             // the log segments cache went wrong
             LOG.error("Unexpected exception on getting log segments from the cache for stream {}",
                     getFullyQualifiedName(), ue);
-            metadataException.compareAndSet(null, ue);
+            METADATA_EXCEPTION_UPDATER.compareAndSet(this, null, ue);
             throw ue;
         }
     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
index 66ca2de88..1175f39c9 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -272,7 +272,7 @@ public void onFailure(Throwable cause) {
                         || cause instanceof LogSegmentNotFoundException
                         || cause instanceof UnexpectedException) {
                     // indicate some inconsistent behavior, abort
-                    metadataException.compareAndSet(null, (IOException) cause);
+                    METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null, (IOException) cause);
                     // notify the reader that read handler is in error state
                     notifyReaderOnError(cause);
                     FutureUtils.completeExceptionally(promise, cause);
@@ -318,7 +318,7 @@ public void onFailure(Throwable cause) {
                         || cause instanceof LogSegmentNotFoundException
                         || cause instanceof UnexpectedException) {
                     // indicate some inconsistent behavior, abort
-                    metadataException.compareAndSet(null, (IOException) cause);
+                    METADATA_EXCEPTION_UPDATER.compareAndSet(BKLogReadHandler.this, null, (IOException) cause);
                     // notify the reader that read handler is in error state
                     notifyReaderOnError(cause);
                     return;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 6263ddf76..4ad977b21 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -34,8 +34,8 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -53,6 +53,7 @@
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.common.stats.OpStatsListener;
 import org.apache.distributedlog.common.util.PermitLimiter;
@@ -158,13 +159,16 @@ public void abortTransmit(Throwable reason) {
     private final int logSegmentMetadataVersion;
     private BKTransmitPacket packetPrevious;
     private Entry.Writer recordSetWriter;
-    private final AtomicInteger outstandingTransmits;
+    private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter> outstandingTransmitsUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class, "outstandingTransmits");
+    private volatile int outstandingTransmits = 0;
     private final int transmissionThreshold;
     protected final LogSegmentEntryWriter entryWriter;
     private final CompressionCodec.Type compressionType;
     private final ReentrantLock transmitLock = new ReentrantLock();
-    private final AtomicInteger transmitResult =
-            new AtomicInteger(BKException.Code.OK);
+    private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter> transmitResultUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class, "transmitResult");
+    private volatile int transmitResult = BKException.Code.OK;
     private final DistributedLock lock;
     private final boolean isDurableWriteEnabled;
     private DLSN lastDLSN = DLSN.InvalidDLSN;
@@ -188,11 +192,24 @@ public void abortTransmit(Throwable reason) {
     private boolean streamEnded = false;
     private final ScheduledFuture<?> periodicFlushSchedule;
     private final ScheduledFuture<?> periodicKeepAliveSchedule;
-    private final AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef =
-            new AtomicReference<ScheduledFuture<?>>(null);
-    private final AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef =
-            new AtomicReference<ScheduledFuture<?>>(null);
-    private final AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, ScheduledFuture>
+        transmitSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(
+            BKLogSegmentWriter.class,
+            ScheduledFuture.class,
+            "transmitSchedFutureRef");
+    private volatile ScheduledFuture transmitSchedFutureRef = null;
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, ScheduledFuture>
+        immFlushSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(
+            BKLogSegmentWriter.class,
+            ScheduledFuture.class,
+            "immFlushSchedFutureRef");
+    private volatile ScheduledFuture immFlushSchedFutureRef = null;
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, Exception> scheduledFlushExceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(
+            BKLogSegmentWriter.class,
+            Exception.class,
+            "scheduledFlushException");
+    private volatile Exception scheduledFlushException = null;
     private boolean enforceLock = true;
     private CompletableFuture<Void> closeFuture = null;
     private final boolean enableRecordCounts;
@@ -300,12 +317,11 @@ public Number getDefaultValue() {
             }
             @Override
             public Number getSample() {
-                return outstandingTransmits.get();
+                return outstandingTransmitsUpdater.get(BKLogSegmentWriter.this);
             }
         };
         transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
 
-        outstandingTransmits = new AtomicInteger(0);
         this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
         this.streamName = streamName;
         this.logSegmentMetadataVersion = logSegmentMetadataVersion;
@@ -395,7 +411,7 @@ ScheduledExecutorService getFuturePool() {
 
     @VisibleForTesting
     void setTransmitResult(int rc) {
-        transmitResult.set(rc);
+        transmitResultUpdater.set(this, rc);
     }
 
     @VisibleForTesting
@@ -510,7 +526,7 @@ private synchronized void abortPacket(BKTransmitPacket packet) {
         if (null != packet) {
             EntryBuffer recordSet = packet.getRecordSet();
             numRecords = recordSet.getNumRecords();
-            int rc = transmitResult.get();
+            int rc = transmitResultUpdater.get(this);
             if (BKException.Code.OK == rc) {
                 rc = BKException.Code.InterruptedException;
             }
@@ -537,13 +553,13 @@ private synchronized long getWritesPendingTransmit() {
             closePromise = closeFuture = new CompletableFuture<Void>();
         }
 
-        AtomicReference<Throwable> throwExc = new AtomicReference<Throwable>(null);
+        MutableObject<Throwable> throwExc = new MutableObject<>(null);
         closeInternal(abort, throwExc, closePromise);
         return closePromise;
     }
 
     private void closeInternal(final boolean abort,
-                               final AtomicReference<Throwable> throwExc,
+                               final MutableObject<Throwable> throwExc,
                                final CompletableFuture<Void> closePromise) {
         // clean stats resources
         this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
@@ -579,7 +595,7 @@ public void onSuccess(Long value) {
 
                 @Override
                 public void onFailure(Throwable cause) {
-                    throwExc.set(cause);
+                    throwExc.setValue(cause);
                     abortTransmitPacketOnClose(abort, throwExc, closePromise);
                 }
             });
@@ -590,12 +606,12 @@ public void onFailure(Throwable cause) {
     }
 
     private void abortTransmitPacketOnClose(final boolean abort,
-                                            final AtomicReference<Throwable> throwExc,
+                                            final MutableObject<Throwable> throwExc,
                                             final CompletableFuture<Void> closePromise) {
         LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :"
                         + " lastDLSN = {} outstandingTransmits = {} writesPendingTransmit = {}",
                 new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
-                        outstandingTransmits.get(), getWritesPendingTransmit()});
+                        outstandingTransmitsUpdater.get(this), getWritesPendingTransmit()});
 
         // Save the current packet to reset, leave a new empty packet to avoid a race with
         // addCompleteDeferredProcessing.
@@ -629,10 +645,10 @@ public void onFailure(Throwable cause) {
     }
 
     private void closeLedgerOnClose(final boolean abort,
-                                    final AtomicReference<Throwable> throwExc,
+                                    final MutableObject<Throwable> throwExc,
                                     final CompletableFuture<Void> closePromise) {
         // close the log segment if it isn't in error state, so all the outstanding addEntry(s) will callback.
-        if (null == throwExc.get() && !isLogSegmentInError()) {
+        if (null == throwExc.getValue() && !isLogSegmentInError()) {
             // Synchronous closing the ledger handle, if we couldn't close a ledger handle successfully.
             // we should throw the exception to #closeToFinalize, so it would fail completing a log segment.
             entryWriter.asyncClose(new CloseCallback() {
@@ -640,7 +656,7 @@ private void closeLedgerOnClose(final boolean abort,
                 public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
                     if (BKException.Code.OK != rc && BKException.Code.LedgerClosedException != rc) {
                         if (!abort) {
-                            throwExc.set(new IOException("Failed to close ledger for "
+                            throwExc.setValue(new IOException("Failed to close ledger for "
                                     + fullyQualifiedLogSegment + " : " + BKException.getMessage(rc)));
                         }
                     }
@@ -653,17 +669,17 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
     }
 
     private void completeClosePromise(final boolean abort,
-                                      final AtomicReference<Throwable> throwExc,
+                                      final MutableObject<Throwable> throwExc,
                                       final CompletableFuture<Void> closePromise) {
         // If add entry failed because of closing ledger above, we don't need to fail the close operation
-        if (!abort && null == throwExc.get() && shouldFailCompleteLogSegment()) {
-            throwExc.set(new BKTransmitException("Closing an errored stream : ", transmitResult.get()));
+        if (!abort && null == throwExc.getValue() && shouldFailCompleteLogSegment()) {
+            throwExc.setValue(new BKTransmitException("Closing an errored stream : ", transmitResultUpdater.get(this)));
         }
 
-        if (null == throwExc.get()) {
+        if (null == throwExc.getValue()) {
             FutureUtils.complete(closePromise, null);
         } else {
-            FutureUtils.completeExceptionally(closePromise, throwExc.get());
+            FutureUtils.completeExceptionally(closePromise, throwExc.getValue());
         }
     }
 
@@ -721,9 +737,9 @@ public synchronized  void write(LogRecord record) throws IOException {
                     BKException.getMessage(BKException.Code.LedgerClosedException));
         }
 
-        if (BKException.Code.OK != transmitResult.get()) {
+        if (BKException.Code.OK != transmitResultUpdater.get(this)) {
             // Failfast if the stream already encountered error with safe retry on the client
-            throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(transmitResult.get()));
+            throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(transmitResultUpdater.get(this)));
         }
 
         if (streamEnded) {
@@ -777,12 +793,12 @@ public synchronized  void write(LogRecord record) throws IOException {
     }
 
     boolean isLogSegmentInError() {
-        return (transmitResult.get() != BKException.Code.OK);
+        return (transmitResultUpdater.get(this) != BKException.Code.OK);
     }
 
     boolean shouldFailCompleteLogSegment() {
-        return (transmitResult.get() != BKException.Code.OK)
-                && (transmitResult.get() != BKException.Code.LedgerClosedException);
+        return (transmitResultUpdater.get(this) != BKException.Code.OK)
+                && (transmitResultUpdater.get(this) != BKException.Code.LedgerClosedException);
     }
 
     public synchronized CompletableFuture<DLSN> writeInternal(LogRecord record)
@@ -977,23 +993,24 @@ void flushIfNeededNoThrow() {
         }
     }
 
-    void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
-                                        final AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
+    void scheduleFlushWithDelayIfNeeded(
+            final Callable<?> callable,
+            final AtomicReferenceFieldUpdater<BKLogSegmentWriter, ScheduledFuture> scheduledFutureRefUpdater) {
         final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - lastTransmit.elapsed(TimeUnit.MILLISECONDS));
-        final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+        final ScheduledFuture scheduledFuture = scheduledFutureRefUpdater.get(this);
         if ((null == scheduledFuture) || scheduledFuture.isDone()) {
-            scheduledFutureRef.set(scheduler.schedule(new Runnable() {
+            scheduledFutureRefUpdater.set(this, scheduler.schedule(new Runnable() {
                 @Override
                 public void run() {
                     synchronized (this) {
-                        scheduledFutureRef.set(null);
+                        scheduledFutureRefUpdater.set(BKLogSegmentWriter.this, null);
                         try {
                             callable.call();
 
                             // Flush was successful or wasn't needed, the exception should be unset.
-                            scheduledFlushException.set(null);
+                            scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, null);
                         } catch (Exception exc) {
-                            scheduledFlushException.set(exc);
+                            scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, exc);
                             LOG.error("Delayed flush failed", exc);
                         }
                     }
@@ -1017,15 +1034,16 @@ public Void call() throws Exception {
                         checkStateAndTransmit();
                         return null;
                     }
-                }, transmitSchedFutureRef);
+                }, transmitSchedFutureRefUpdater);
 
                 // Timing here is not very important--the last flush failed and we should
                 // indicate this to the caller. The next flush may succeed and unset the
                 // scheduledFlushException in which case the next write will succeed (if the caller
                 // hasn't already closed the writer).
-                if (scheduledFlushException.get() != null) {
+                Exception exec = scheduledFlushExceptionUpdater.get(this);
+                if (exec != null) {
                     throw new FlushException("Last flush encountered an error while writing data to the backend",
-                        getLastTxId(), getLastTxIdAcknowledged(), scheduledFlushException.get());
+                        getLastTxId(), getLastTxIdAcknowledged(), exec);
                 }
             }
         }
@@ -1069,14 +1087,14 @@ private void checkWriteLock() throws LockingException {
                 checkWriteLock();
                 // If transmitResult is anything other than BKException.Code.OK, it means that the
                 // stream has encountered an error and cannot be written to.
-                if (!transmitResult.compareAndSet(BKException.Code.OK,
+                if (!transmitResultUpdater.compareAndSet(this, BKException.Code.OK,
                                                   BKException.Code.OK)) {
                     LOG.error("Log Segment {} Trying to write to an errored stream; Error is {}",
                               fullyQualifiedLogSegment,
-                              BKException.getMessage(transmitResult.get()));
+                              BKException.getMessage(transmitResultUpdater.get(this)));
                     throw new BKTransmitException("Trying to write to an errored stream;"
-                                                          + " Error code : (" + transmitResult.get() + ") "
-                            + BKException.getMessage(transmitResult.get()), transmitResult.get());
+                                                          + " Error code : (" + transmitResultUpdater.get(this) + ") "
+                            + BKException.getMessage(transmitResultUpdater.get(this)), transmitResultUpdater.get(this));
                 }
 
                 if (recordSetWriter.getNumRecords() == 0) {
@@ -1107,7 +1125,7 @@ private void checkWriteLock() throws LockingException {
                           new Object[] {fullyQualifiedLogSegment}, e);
                 // If a write fails here, we need to set the transmit result to an error so that
                 // no future writes go through and violate ordering guarantees.
-                transmitResult.set(BKException.Code.WriteException);
+                transmitResultUpdater.set(this, BKException.Code.WriteException);
                 if (e instanceof InvalidEnvelopedEntryException) {
                     alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", fullyQualifiedLogSegment, e);
                     throw (InvalidEnvelopedEntryException) e;
@@ -1131,7 +1149,7 @@ private void checkWriteLock() throws LockingException {
                 }
 
                 lastTransmit.reset().start();
-                outstandingTransmits.incrementAndGet();
+                outstandingTransmitsUpdater.incrementAndGet(this);
                 controlFlushNeeded = false;
                 return packet.getTransmitFuture();
             }
@@ -1145,7 +1163,7 @@ private void checkWriteLock() throws LockingException {
      *  flush task can determine if there is anything it needs to do.
      */
     private synchronized  boolean haveDataToTransmit() {
-        if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) {
+        if (!transmitResultUpdater.compareAndSet(this, BKException.Code.OK, BKException.Code.OK)) {
             // Even if there is data it cannot be transmitted, so effectively nothing to send
             return false;
         }
@@ -1156,14 +1174,15 @@ private synchronized  boolean haveDataToTransmit() {
     @Override
     public void addComplete(final int rc, LedgerHandle handle,
                             final long entryId, final Object ctx) {
-        final AtomicReference<Integer> effectiveRC = new AtomicReference<Integer>(rc);
+        int rcAfterFailPoint = rc;
         try {
             if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete)) {
-                effectiveRC.set(BKException.Code.UnexpectedConditionException);
+                rcAfterFailPoint = BKException.Code.UnexpectedConditionException;
             }
         } catch (Exception exc) {
-            effectiveRC.set(BKException.Code.UnexpectedConditionException);
+            rcAfterFailPoint = BKException.Code.UnexpectedConditionException;
         }
+        final int effectiveRC = rcAfterFailPoint;
 
         // Sanity check to make sure we're receiving these callbacks in order.
         if (entryId > -1 && lastEntryId >= entryId) {
@@ -1199,7 +1218,7 @@ public Void call() {
                     addCompleteQueuedTime.registerSuccessfulEvent(
                         queuedTime.elapsed(TimeUnit.MICROSECONDS),
                         TimeUnit.MICROSECONDS);
-                    addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
+                    addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC);
                     addCompleteDeferredTime.registerSuccessfulEvent(
                         deferredTime.elapsed(TimeUnit.MICROSECONDS),
                         TimeUnit.MILLISECONDS);
@@ -1223,14 +1242,14 @@ public void onFailure(Throwable cause) {
                 }
             });
             // Race condition if we notify before the addComplete is enqueued.
-            transmitPacket.notifyTransmitComplete(effectiveRC.get());
-            outstandingTransmits.getAndDecrement();
+            transmitPacket.notifyTransmitComplete(effectiveRC);
+            outstandingTransmitsUpdater.getAndDecrement(this);
         } else {
             // Notify transmit complete must be called before deferred processing in the
             // sync case since otherwise callbacks in deferred processing may deadlock.
-            transmitPacket.notifyTransmitComplete(effectiveRC.get());
-            outstandingTransmits.getAndDecrement();
-            addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
+            transmitPacket.notifyTransmitComplete(effectiveRC);
+            outstandingTransmitsUpdater.getAndDecrement(this);
+            addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC);
         }
     }
 
@@ -1240,17 +1259,17 @@ private void addCompleteDeferredProcessing(final BKTransmitPacket transmitPacket
         boolean cancelPendingPromises = false;
         EntryBuffer recordSet = transmitPacket.getRecordSet();
         synchronized (this) {
-            if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
+            if (transmitResultUpdater.compareAndSet(this, BKException.Code.OK, rc)) {
                 // If this is the first time we are setting an error code in the transmitResult then
                 // we must cancel pending promises; once this error has been set, more records will not
                 // be enqueued; they will be failed with WriteException
                 cancelPendingPromises = (BKException.Code.OK != rc);
             } else {
                 LOG.warn("Log segment {} entryId {}: Tried to set transmit result to ({}) but is already ({})",
-                    new Object[] {fullyQualifiedLogSegment, entryId, rc, transmitResult.get()});
+                    new Object[] {fullyQualifiedLogSegment, entryId, rc, transmitResultUpdater.get(this)});
             }
 
-            if (transmitResult.get() != BKException.Code.OK) {
+            if (transmitResultUpdater.get(this) != BKException.Code.OK) {
                 if (recordSet.hasUserRecords()) {
                     transmitDataPacketSize.registerFailedEvent(
                         recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
@@ -1273,14 +1292,14 @@ public Void call() throws Exception {
                                     backgroundFlush(true);
                                     return null;
                                 }
-                            }, immFlushSchedFutureRef);
+                            }, immFlushSchedFutureRefUpdater);
                         }
                     }
                 }
             }
 
             // update last dlsn before satisifying future
-            if (BKException.Code.OK == transmitResult.get()) {
+            if (BKException.Code.OK == transmitResultUpdater.get(this)) {
                 DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
                         logSegmentSequenceNumber, entryId);
                 if (recordSet.hasUserRecords()) {
@@ -1291,10 +1310,10 @@ public Void call() throws Exception {
             }
         }
 
-        if (BKException.Code.OK == transmitResult.get()) {
+        if (BKException.Code.OK == transmitResultUpdater.get(this)) {
             recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
         } else {
-            recordSet.abortTransmit(Utils.transmitException(transmitResult.get()));
+            recordSet.abortTransmit(Utils.transmitException(transmitResultUpdater.get(this)));
         }
 
         if (cancelPendingPromises) {
@@ -1308,7 +1327,7 @@ public Void call() throws Exception {
             }
             packetCurrentSaved.getRecordSet().abortTransmit(
                     new WriteCancelledException(streamName,
-                            Utils.transmitException(transmitResult.get())));
+                            Utils.transmitException(transmitResultUpdater.get(this))));
         }
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index d62c8e73b..898c11382 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -30,9 +30,9 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -220,12 +220,12 @@ void processReadEntry(int rc,
          */
         boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
             if (BKException.Code.OK == rc) {
-                numReadErrors.set(0);
+                numReadErrorsUpdater.set(BKLogSegmentEntryReader.this, 0);
                 return true;
             }
             if (BKException.Code.BookieHandleNotAvailableException == rc
                     || (isLongPoll && BKException.Code.NoSuchLedgerExistsException == rc)) {
-                int numErrors = Math.max(1, numReadErrors.incrementAndGet());
+                int numErrors = Math.max(1, numReadErrorsUpdater.incrementAndGet(BKLogSegmentEntryReader.this));
                 int nextReadBackoffTime = Math.min(numErrors * readAheadWaitTime, maxReadBackoffTime);
                 scheduler.scheduleOrdered(
                         getSegment().getLogSegmentId(),
@@ -301,15 +301,21 @@ boolean hasReadEnoughEntries() {
     private final List<LedgerHandle> openLedgerHandles;
     private CacheEntry outstandingLongPoll;
     private long nextEntryId;
-    private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null);
-    private final AtomicLong scheduleCount = new AtomicLong(0);
+    private static final AtomicReferenceFieldUpdater<BKLogSegmentEntryReader, Throwable> lastExceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, Throwable.class, "lastException");
+    private volatile Throwable lastException = null;
+    private static final AtomicLongFieldUpdater<BKLogSegmentEntryReader> scheduleCountUpdater =
+        AtomicLongFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, "scheduleCount");
+    private volatile long scheduleCount = 0L;
     private volatile boolean hasCaughtupOnInprogress = false;
     private final CopyOnWriteArraySet<StateChangeListener> stateChangeListeners =
             new CopyOnWriteArraySet<StateChangeListener>();
     // read retries
     private int readAheadWaitTime;
     private final int maxReadBackoffTime;
-    private final AtomicInteger numReadErrors = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<BKLogSegmentEntryReader> numReadErrorsUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentEntryReader.class, "numReadErrors");
+    private volatile int numReadErrors = 0;
     private final boolean skipBrokenEntries;
     // readahead cache
     int cachedEntries = 0;
@@ -493,8 +499,9 @@ private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata segment) {
     //
 
     private boolean checkClosedOrInError() {
-        if (null != lastException.get()) {
-            cancelAllPendingReads(lastException.get());
+        Throwable cause = lastExceptionUpdater.get(this);
+        if (null != cause) {
+            cancelAllPendingReads(cause);
             return true;
         }
         return false;
@@ -507,7 +514,7 @@ private boolean checkClosedOrInError() {
      * @param isBackground is the reader set exception by background reads or foreground reads
      */
     private void completeExceptionally(Throwable throwable, boolean isBackground) {
-        lastException.compareAndSet(null, throwable);
+        lastExceptionUpdater.compareAndSet(this, null, throwable);
         if (isBackground) {
             notifyReaders();
         }
@@ -662,7 +669,7 @@ private void issueLongPollRead(CacheEntry cacheEntry) {
         final PendingReadRequest readRequest = new PendingReadRequest(numEntries);
 
         if (checkClosedOrInError()) {
-            readRequest.completeExceptionally(lastException.get());
+            readRequest.completeExceptionally(lastExceptionUpdater.get(this));
         } else {
             boolean wasQueueEmpty;
             synchronized (readQueue) {
@@ -682,7 +689,7 @@ private void processReadRequests() {
             return;
         }
 
-        long prevCount = scheduleCount.getAndIncrement();
+        long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
             scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
         }
@@ -693,7 +700,7 @@ private void processReadRequests() {
      */
     @Override
     public void safeRun() {
-        long scheduleCountLocal = scheduleCount.get();
+        long scheduleCountLocal = scheduleCountUpdater.get(this);
         while (true) {
             PendingReadRequest nextRequest = null;
             synchronized (readQueue) {
@@ -702,14 +709,14 @@ public void safeRun() {
 
             // if read queue is empty, nothing to read, return
             if (null == nextRequest) {
-                scheduleCount.set(0L);
+                scheduleCountUpdater.set(this, 0L);
                 return;
             }
 
             // if the oldest pending promise is interrupted then we must
             // mark the reader in error and abort all pending reads since
             // we don't know the last consumed read
-            if (null == lastException.get()) {
+            if (null == lastExceptionUpdater.get(this)) {
                 if (nextRequest.getPromise().isCancelled()) {
                     completeExceptionally(new DLInterruptedException("Interrupted on reading log segment "
                             + getSegment() + " : " + nextRequest.getPromise().isCancelled()), false);
@@ -745,7 +752,7 @@ public void safeRun() {
                 if (0 == scheduleCountLocal) {
                     return;
                 }
-                scheduleCountLocal = scheduleCount.decrementAndGet();
+                scheduleCountLocal = scheduleCountUpdater.decrementAndGet(this);
             }
         }
     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
index c68000006..5a5651779 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
@@ -20,7 +20,7 @@
 import com.google.common.base.Charsets;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -42,7 +42,9 @@
 
     private final ZooKeeperClient zooKeeperClient;
     private final String zkPath;
-    private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null);
+    private static final AtomicReferenceFieldUpdater<ZKSubscriptionStateStore, DLSN> lastCommittedPositionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(ZKSubscriptionStateStore.class, DLSN.class, "lastCommittedPosition");
+    private volatile DLSN lastCommittedPosition = null;
 
     public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) {
         this.zooKeeperClient = zooKeeperClient;
@@ -58,8 +60,9 @@ public void close() throws IOException {
      */
     @Override
     public CompletableFuture<DLSN> getLastCommitPosition() {
-        if (null != lastCommittedPosition.get()) {
-            return FutureUtils.value(lastCommittedPosition.get());
+        DLSN dlsn = lastCommittedPositionUpdater.get(this);
+        if (null != dlsn) {
+            return FutureUtils.value(dlsn);
         } else {
             return getLastCommitPositionFromZK();
         }
@@ -105,9 +108,10 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta
      */
     @Override
     public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition) {
-        if (null == lastCommittedPosition.get()
-                || (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
-            lastCommittedPosition.set(newPosition);
+        DLSN dlsn = lastCommittedPositionUpdater.get(this);
+        if (null == dlsn
+                || (newPosition.compareTo(dlsn) > 0)) {
+            lastCommittedPositionUpdater.set(this, newPosition);
             return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
                 zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
                 zooKeeperClient.getDefaultACL(),
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 81d721b9e..7948084fa 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -92,7 +93,9 @@
     private CompletableFuture<Void> closeFuture = null;
 
     // A counter to track how many re-acquires happened during a lock's life cycle.
-    private final AtomicInteger reacquireCount = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<ZKDistributedLock> reacquireCountUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(ZKDistributedLock.class, "reacquireCount");
+    private volatile int reacquireCount = 0;
     private final StatsLogger lockStatsLogger;
     private final OpStatsLogger acquireStats;
     private final OpStatsLogger reacquireStats;
@@ -323,7 +326,7 @@ public synchronized void checkOwnership() throws LockingException {
 
     @VisibleForTesting
     int getReacquireCount() {
-        return reacquireCount.get();
+        return reacquireCountUpdater.get(this);
     }
 
     @VisibleForTesting
@@ -511,7 +514,7 @@ public void onFailure(Throwable cause) {
                 }
             });
         }
-        reacquireCount.incrementAndGet();
+        reacquireCountUpdater.incrementAndGet(this);
         internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise);
         return lockPromise;
     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index f018e3602..3d7933631 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -31,7 +31,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -274,7 +274,9 @@ private Exception getStack() {
     private String currentNode;
     private String watchedNode;
     private LockWatcher watcher;
-    private final AtomicInteger epoch = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<ZKSessionLock> epochUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(ZKSessionLock.class, "epoch");
+    private volatile int epoch = 0;
     private final OrderedScheduler lockStateExecutor;
     private LockListener lockListener = null;
     private final long lockOpTimeout;
@@ -358,9 +360,8 @@ String getLockPath() {
         return this.lockPath;
     }
 
-    @VisibleForTesting
-    AtomicInteger getEpoch() {
-        return epoch;
+    int getEpoch() {
+        return epochUpdater.get(this);
     }
 
     @VisibleForTesting
@@ -394,7 +395,7 @@ protected void executeLockAction(final int lockEpoch, final LockAction func) {
         lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
-                if (ZKSessionLock.this.epoch.get() == lockEpoch) {
+                if (getEpoch() == lockEpoch) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("{} executing lock action '{}' under epoch {} for lock {}",
                                 new Object[]{lockId, func.getActionName(), lockEpoch, lockPath});
@@ -409,7 +410,7 @@ public void safeRun() {
                         LOG.trace("{} skipped executing lock action '{}' for lock {},"
                                         + " since epoch is changed from {} to {}.",
                                 new Object[]{lockId, func.getActionName(),
-                                        lockPath, lockEpoch, ZKSessionLock.this.epoch.get()});
+                                        lockPath, lockEpoch, getEpoch()});
                     }
                 }
             }
@@ -433,7 +434,7 @@ public void safeRun() {
         lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
-                int currentEpoch = ZKSessionLock.this.epoch.get();
+                int currentEpoch = getEpoch();
                 if (currentEpoch == lockEpoch) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("{} executed lock action '{}' under epoch {} for lock {}",
@@ -656,7 +657,7 @@ public void safeRun() {
             return false;
         }
         // current owner is itself
-        final int curEpoch = epoch.incrementAndGet();
+        final int curEpoch = epochUpdater.incrementAndGet(this);
         executeLockAction(curEpoch, new LockAction() {
             @Override
             public void execute() {
@@ -736,7 +737,7 @@ public void onFailure(Throwable cause) {
      *          promise to satisfy with current lock owner.
      */
     private void asyncTryLockWithoutCleanup(final boolean wait, final CompletableFuture<String> promise) {
-        executeLockAction(epoch.get(), new LockAction() {
+        executeLockAction(getEpoch(), new LockAction() {
             @Override
             public void execute() {
                 if (!lockState.inState(State.INIT)) {
@@ -746,7 +747,7 @@ public void execute() {
                 }
                 lockState.transition(State.PREPARING);
 
-                final int curEpoch = epoch.incrementAndGet();
+                final int curEpoch = epochUpdater.incrementAndGet(ZKSessionLock.this);
                 watcher = new LockWatcher(curEpoch);
                 // register watcher for session expires
                 zkClient.register(watcher);
@@ -913,7 +914,7 @@ private void claimOwnership(int lockEpoch) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Notify lock waiters on {} at {} : watcher epoch {}, lock epoch {}",
                     new Object[] { lockPath, System.currentTimeMillis(),
-                            lockEpoch, ZKSessionLock.this.epoch.get() });
+                            lockEpoch, getEpoch() });
         }
         acquireFuture.complete(true);
     }
@@ -924,7 +925,7 @@ private void claimOwnership(int lockEpoch) {
     private void unlockInternal(final CompletableFuture<Void> promise) {
 
         // already closed or expired, nothing to cleanup
-        this.epoch.incrementAndGet();
+        this.epochUpdater.incrementAndGet(this);
         if (null != watcher) {
             this.zkClient.unregister(watcher);
         }
@@ -1026,7 +1027,7 @@ public void execute() {
                 }
 
                 // increment epoch to avoid any ongoing locking action
-                ZKSessionLock.this.epoch.incrementAndGet();
+                epochUpdater.incrementAndGet(ZKSessionLock.this);
 
                 // if session expired, just notify the waiter. as the lock acquire doesn't succeed.
                 // we don't even need to clean up the lock as the znode will disappear after session expired
@@ -1326,7 +1327,7 @@ public String getActionName() {
         @Override
         public void process(WatchedEvent event) {
             LOG.debug("Received event {} from lock {} at {} : watcher epoch {}, lock epoch {}.",
-                    new Object[] {event, lockPath, System.currentTimeMillis(), epoch, ZKSessionLock.this.epoch.get() });
+                    new Object[] {event, lockPath, System.currentTimeMillis(), epoch, getEpoch() });
             if (event.getType() == Watcher.Event.EventType.None) {
                 switch (event.getState()) {
                     case SyncConnected:
@@ -1334,7 +1335,7 @@ public void process(WatchedEvent event) {
                     case Expired:
                         LOG.info("Session {} is expired for lock {} at {} : watcher epoch {}, lock epoch {}.",
                                 new Object[] { lockId.getRight(), lockPath, System.currentTimeMillis(),
-                                        epoch, ZKSessionLock.this.epoch.get() });
+                                        epoch, getEpoch() });
                         handleSessionExpired(epoch);
                         break;
                     default:
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index b5bc49995..5a076e36f 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.util;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -41,7 +41,9 @@
 
     final Counter acquireFailureCounter;
     final OpStatsLogger permitsMetric;
-    final AtomicInteger permits;
+    private static final AtomicIntegerFieldUpdater<SimplePermitLimiter> permitsUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(SimplePermitLimiter.class, "permits");
+    volatile int permits = 0;
     final int permitsMax;
     final boolean darkmode;
     final Feature disableWriteLimitFeature;
@@ -51,7 +53,6 @@
 
     public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger statsLogger,
                                boolean singleton, Feature disableWriteLimitFeature) {
-        this.permits = new AtomicInteger(0);
         this.permitsMax = permitsMax;
         this.darkmode = darkmode;
         this.disableWriteLimitFeature = disableWriteLimitFeature;
@@ -66,7 +67,7 @@ public Number getDefaultValue() {
                 }
                 @Override
                 public Number getSample() {
-                    return permits.get();
+                    return permitsUpdater.get(SimplePermitLimiter.this);
                 }
             };
             this.permitsGaugeLabel = "permits";
@@ -82,19 +83,19 @@ public boolean isDarkmode() {
 
     @Override
     public boolean acquire() {
-        permitsMetric.registerSuccessfulValue(permits.get());
-        if (permits.incrementAndGet() <= permitsMax || isDarkmode()) {
+        permitsMetric.registerSuccessfulValue(permitsUpdater.get(this));
+        if (permitsUpdater.incrementAndGet(this) <= permitsMax || isDarkmode()) {
             return true;
         } else {
             acquireFailureCounter.inc();
-            permits.decrementAndGet();
+            permitsUpdater.decrementAndGet(this);
             return false;
         }
     }
 
     @Override
     public void release(int permitsToRelease) {
-        permits.addAndGet(-permitsToRelease);
+        permitsUpdater.addAndGet(this, -permitsToRelease);
     }
 
     @Override
@@ -104,7 +105,7 @@ public void close() {
 
     @VisibleForTesting
     public int getPermits() {
-        return permits.get();
+        return permitsUpdater.get(this);
     }
 
     public void unregisterGauge() {
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
index bd731a663..211d10c3f 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
@@ -21,7 +21,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -44,14 +44,14 @@
         ALLOWED, DISALLOWED, DISABLED
     }
 
-    class EpochPermit implements Permit {
+    static class EpochPermit implements Permit {
 
         final PermitState state;
         final int epoch;
 
-        EpochPermit(PermitState state) {
+        EpochPermit(PermitState state, int epoch) {
             this.state = state;
-            this.epoch = LimitedPermitManager.this.epoch.get();
+            this.epoch = epoch;
         }
 
         int getEpoch() {
@@ -69,7 +69,9 @@ public boolean isAllowed() {
     final int period;
     final TimeUnit timeUnit;
     final ScheduledExecutorService executorService;
-    final AtomicInteger epoch = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<LimitedPermitManager> epochUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(LimitedPermitManager.class, "epoch");
+    volatile int epoch = 0;
     private StatsLogger statsLogger = null;
     private Gauge<Number> outstandingGauge = null;
 
@@ -106,13 +108,13 @@ public Number getSample() {
     @Override
     public synchronized Permit acquirePermit() {
         if (!enablePermits) {
-            return new EpochPermit(PermitState.DISABLED);
+            return new EpochPermit(PermitState.DISABLED, epochUpdater.get(this));
         }
         if (null != semaphore) {
-            return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) :
-                    new EpochPermit(PermitState.DISALLOWED);
+            return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED, epochUpdater.get(this)) :
+                    new EpochPermit(PermitState.DISALLOWED, epochUpdater.get(this));
         } else {
-            return new EpochPermit(PermitState.ALLOWED);
+            return new EpochPermit(PermitState.ALLOWED, epochUpdater.get(this));
         }
     }
 
@@ -138,9 +140,10 @@ public synchronized boolean disallowObtainPermits(Permit permit) {
         if (!(permit instanceof EpochPermit)) {
             return false;
         }
-        if (epoch.getAndIncrement() == ((EpochPermit) permit).getEpoch()) {
+        int epoch = epochUpdater.getAndIncrement(this);
+        if (epoch == ((EpochPermit) permit).getEpoch()) {
             this.enablePermits = false;
-            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
+            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch);
             return true;
         } else {
             return false;
@@ -159,9 +162,9 @@ public synchronized boolean allowObtainPermits() {
     }
 
     synchronized void forceSetAllowPermits(boolean allowPermits) {
-        epoch.getAndIncrement();
+        int epoch = epochUpdater.getAndIncrement(this);
         this.enablePermits = allowPermits;
-        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
+        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch);
     }
 
     @Override
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
index fe8eed421..8c350c9f4 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
@@ -21,7 +21,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -89,7 +89,7 @@ public ZKWatcherManager build() {
     private static final String numChildWatchesGauageLabel = "num_child_watches";
 
     protected final ConcurrentMap<String, Set<Watcher>> childWatches;
-    protected final AtomicInteger allWatchesGauge;
+    protected final LongAdder allWatchesGauge;
 
     private ZKWatcherManager(String name,
                              ZooKeeperClient zkc,
@@ -100,7 +100,7 @@ private ZKWatcherManager(String name,
 
         // watches
         this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>();
-        this.allWatchesGauge = new AtomicInteger(0);
+        this.allWatchesGauge = new LongAdder();
 
         // stats
         totalWatchesGauge = new Gauge<Number>() {
@@ -111,7 +111,7 @@ public Number getDefaultValue() {
 
             @Override
             public Number getSample() {
-                return allWatchesGauge.get();
+                return allWatchesGauge.sum();
             }
         };
         this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge);
@@ -141,7 +141,7 @@ public Watcher registerChildWatcher(String path, Watcher watcher) {
         synchronized (watchers) {
             if (childWatches.get(path) == watchers) {
                 if (watchers.add(watcher)) {
-                    allWatchesGauge.incrementAndGet();
+                    allWatchesGauge.increment();
                 }
             } else {
                 logger.warn("Watcher set for path {} has been changed while registering child watcher {}.",
@@ -160,7 +160,7 @@ public void unregisterChildWatcher(String path, Watcher watcher, boolean removeF
         }
         synchronized (watchers) {
             if (watchers.remove(watcher)) {
-                allWatchesGauge.decrementAndGet();
+                allWatchesGauge.decrement();
             } else {
                 logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
             }
@@ -212,7 +212,7 @@ public void process(WatchedEvent event) {
     }
 
     private void handleKeeperStateEvent(WatchedEvent event) {
-        Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
+        Set<Watcher> savedAllWatches = new HashSet<Watcher>((int) allWatchesGauge.sum());
         for (Set<Watcher> watcherSet : childWatches.values()) {
             synchronized (watcherSet) {
                 savedAllWatches.addAll(watcherSet);
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index ad1ebc175..901d2a04c 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -221,7 +221,7 @@ public void testExecuteLockAction() throws Exception {
 
         // lock action would be executed in same epoch
         final CountDownLatch latch1 = new CountDownLatch(1);
-        lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
+        lock.executeLockAction(lock.getEpoch(), new LockAction() {
             @Override
             public void execute() {
                 counter.incrementAndGet();
@@ -238,7 +238,7 @@ public String getActionName() {
 
         // lock action would not be executed in same epoch
         final CountDownLatch latch2 = new CountDownLatch(1);
-        lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
+        lock.executeLockAction(lock.getEpoch() + 1, new LockAction() {
             @Override
             public void execute() {
                 counter.incrementAndGet();
@@ -249,7 +249,7 @@ public String getActionName() {
                 return "increment2";
             }
         });
-        lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
+        lock.executeLockAction(lock.getEpoch(), new LockAction() {
             @Override
             public void execute() {
                 latch2.countDown();
@@ -265,7 +265,7 @@ public String getActionName() {
 
         // lock action would not be executed in same epoch and promise would be satisfied with exception
         CompletableFuture<Void> promise = new CompletableFuture<Void>();
-        lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
+        lock.executeLockAction(lock.getEpoch() + 1, new LockAction() {
             @Override
             public void execute() {
                 counter.incrementAndGet();
diff --git a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
index e6d076e8d..b0b5bb14f 100644
--- a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
+++ b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
@@ -25,7 +25,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -52,7 +52,9 @@
     private long writePos = 0L;
 
     // state
-    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
+    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
+    private volatile Throwable exception = null;
 
     DLOutputStream(DistributedLogManager dlm,
                    AsyncLogWriter writer) {
@@ -83,7 +85,7 @@ public void write(byte[] b, int off, int len) throws IOException {
     }
 
     private synchronized void write(ByteBuf buf) throws IOException {
-        Throwable cause = exception.get();
+        Throwable cause = exceptionUpdater.get(this);
         if (null != cause) {
             if (cause instanceof IOException) {
                 throw (IOException) cause;
@@ -104,7 +106,7 @@ public void onSuccess(DLSN value) {
 
             @Override
             public void onFailure(Throwable cause) {
-                exception.compareAndSet(null, cause);
+                exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
             }
         });
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services