You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2021/04/17 07:49:35 UTC

[hadoop] branch trunk updated: HDFS-15975. Use LongAdder instead of AtomicLong (#2907)

This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af0448d  HDFS-15975. Use LongAdder instead of AtomicLong (#2907)
af0448d is described below

commit af0448d37bd8f303e8db845ecbc2ca31d79a70f7
Author: litao <to...@gmail.com>
AuthorDate: Sat Apr 17 15:49:09 2021 +0800

    HDFS-15975. Use LongAdder instead of AtomicLong (#2907)
---
 .../hadoop/metrics2/lib/MutableCounterLong.java    | 10 ++++----
 .../apache/hadoop/hdfs/DFSHedgedReadMetrics.java   | 14 +++++------
 .../apache/hadoop/hdfs/DFSOpsCountStatistics.java  | 20 ++++++++--------
 .../datanode/fsdataset/impl/FsDatasetCache.java    | 28 +++++++++++-----------
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  2 +-
 .../hadoop/hdfs/server/namenode/FSEditLog.java     | 10 ++++----
 .../hdfs/server/namenode/FSNamesystemLock.java     | 13 +++++-----
 .../java/org/apache/hadoop/hdfs/TestPread.java     |  8 +++----
 8 files changed, 53 insertions(+), 52 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
index d3dec2e..efaf8a1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * A mutable long counter
@@ -32,11 +32,11 @@ import java.util.concurrent.atomic.AtomicLong;
 @InterfaceStability.Evolving
 public class MutableCounterLong extends MutableCounter {
 
-  private AtomicLong value = new AtomicLong();
+  private final LongAdder value = new LongAdder();
 
   public MutableCounterLong(MetricsInfo info, long initValue) {
     super(info);
-    this.value.set(initValue);
+    this.value.add(initValue);
   }
 
   @Override
@@ -49,12 +49,12 @@ public class MutableCounterLong extends MutableCounter {
    * @param delta of the increment
    */
   public void incr(long delta) {
-    value.addAndGet(delta);
+    value.add(delta);
     setChanged();
   }
 
   public long value() {
-    return value.get();
+    return value.longValue();
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
index 2a228e8..1cd9e82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * The client-side metrics for hedged read feature.
@@ -28,20 +28,20 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @InterfaceAudience.Private
 public class DFSHedgedReadMetrics {
-  public final AtomicLong hedgedReadOps = new AtomicLong();
-  public final AtomicLong hedgedReadOpsWin = new AtomicLong();
-  public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
+  public final LongAdder hedgedReadOps = new LongAdder();
+  public final LongAdder hedgedReadOpsWin = new LongAdder();
+  public final LongAdder hedgedReadOpsInCurThread = new LongAdder();
 
   public void incHedgedReadOps() {
-    hedgedReadOps.incrementAndGet();
+    hedgedReadOps.increment();
   }
 
   public void incHedgedReadOpsInCurThread() {
-    hedgedReadOpsInCurThread.incrementAndGet();
+    hedgedReadOpsInCurThread.increment();
   }
 
   public void incHedgedReadWins() {
-    hedgedReadOpsWin.incrementAndGet();
+    hedgedReadOpsWin.increment();
   }
 
   public long getHedgedReadOps() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
index fdd0072..04fef2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
@@ -26,7 +26,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * This storage statistics tracks how many times each DFS operation was issued.
@@ -141,21 +141,21 @@ public class DFSOpsCountStatistics extends StorageStatistics {
 
   public static final String NAME = "DFSOpsCountStatistics";
 
-  private final Map<OpType, AtomicLong> opsCount = new EnumMap<>(OpType.class);
+  private final Map<OpType, LongAdder> opsCount = new EnumMap<>(OpType.class);
 
   public DFSOpsCountStatistics() {
     super(NAME);
     for (OpType opType : OpType.values()) {
-      opsCount.put(opType, new AtomicLong(0));
+      opsCount.put(opType, new LongAdder());
     }
   }
 
   public void incrementOpCounter(OpType op) {
-    opsCount.get(op).addAndGet(1);
+    opsCount.get(op).increment();
   }
 
   private class LongIterator implements Iterator<LongStatistic> {
-    private Iterator<Entry<OpType, AtomicLong>> iterator =
+    private final Iterator<Entry<OpType, LongAdder>> iterator =
         opsCount.entrySet().iterator();
 
     @Override
@@ -168,9 +168,9 @@ public class DFSOpsCountStatistics extends StorageStatistics {
       if (!iterator.hasNext()) {
         throw new NoSuchElementException();
       }
-      final Entry<OpType, AtomicLong> entry = iterator.next();
+      final Entry<OpType, LongAdder> entry = iterator.next();
       return new LongStatistic(entry.getKey().getSymbol(),
-          entry.getValue().get());
+          entry.getValue().longValue());
     }
 
     @Override
@@ -192,7 +192,7 @@ public class DFSOpsCountStatistics extends StorageStatistics {
   @Override
   public Long getLong(String key) {
     final OpType type = OpType.fromSymbol(key);
-    return type == null ? null : opsCount.get(type).get();
+    return type == null ? null : opsCount.get(type).longValue();
   }
 
   @Override
@@ -202,8 +202,8 @@ public class DFSOpsCountStatistics extends StorageStatistics {
 
   @Override
   public void reset() {
-    for (AtomicLong count : opsCount.values()) {
-      count.set(0);
+    for (LongAdder count : opsCount.values()) {
+      count.reset();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index b6a57fd..facace2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -42,7 +42,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -120,7 +120,7 @@ public class FsDatasetCache {
   private final HashMap<ExtendedBlockId, Value> mappableBlockMap =
       new HashMap<ExtendedBlockId, Value>();
 
-  private final AtomicLong numBlocksCached = new AtomicLong(0);
+  private final LongAdder numBlocksCached = new LongAdder();
 
   private final FsDatasetImpl dataset;
 
@@ -143,11 +143,11 @@ public class FsDatasetCache {
   /**
    * Number of cache commands that could not be completed successfully
    */
-  final AtomicLong numBlocksFailedToCache = new AtomicLong(0);
+  final LongAdder numBlocksFailedToCache = new LongAdder();
   /**
    * Number of uncache commands that could not be completed successfully
    */
-  final AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
+  final LongAdder numBlocksFailedToUncache = new LongAdder();
 
   public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
     this.dataset = dataset;
@@ -204,7 +204,7 @@ public class FsDatasetCache {
       for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) {
         mappableBlockMap.put(entry.getKey(),
             new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED));
-        numBlocksCached.addAndGet(1);
+        numBlocksCached.increment();
         dataset.datanode.getMetrics().incrBlocksCached(1);
       }
     }
@@ -278,7 +278,7 @@ public class FsDatasetCache {
       LOG.debug("Block with id {}, pool {} already exists in the "
               + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
       );
-      numBlocksFailedToCache.incrementAndGet();
+      numBlocksFailedToCache.increment();
       return;
     }
     mappableBlockMap.put(key, new Value(null, State.CACHING));
@@ -301,7 +301,7 @@ public class FsDatasetCache {
       LOG.debug("Block with id {}, pool {} does not need to be uncached, "
           + "because it is not currently in the mappableBlockMap.", blockId,
           bpid);
-      numBlocksFailedToUncache.incrementAndGet();
+      numBlocksFailedToUncache.increment();
       return;
     }
     switch (prevValue.state) {
@@ -331,7 +331,7 @@ public class FsDatasetCache {
     default:
       LOG.debug("Block with id {}, pool {} does not need to be uncached, "
           + "because it is in state {}.", blockId, bpid, prevValue.state);
-      numBlocksFailedToUncache.incrementAndGet();
+      numBlocksFailedToUncache.increment();
       break;
     }
   }
@@ -469,7 +469,7 @@ public class FsDatasetCache {
           dataset.datanode.
               getShortCircuitRegistry().processBlockMlockEvent(key);
         }
-        numBlocksCached.addAndGet(1);
+        numBlocksCached.increment();
         dataset.datanode.getMetrics().incrBlocksCached(1);
         success = true;
       } finally {
@@ -482,7 +482,7 @@ public class FsDatasetCache {
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
                   + "bytes in total.", key, cacheLoader.getCacheUsed());
           IOUtils.closeQuietly(mappableBlock);
-          numBlocksFailedToCache.incrementAndGet();
+          numBlocksFailedToCache.increment();
 
           synchronized (FsDatasetCache.this) {
             mappableBlockMap.remove(key);
@@ -561,7 +561,7 @@ public class FsDatasetCache {
       }
       long newUsedBytes = cacheLoader.
           release(key, value.mappableBlock.getLength());
-      numBlocksCached.addAndGet(-1);
+      numBlocksCached.decrement();
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
         LOG.debug("Uncaching of {} completed. usedBytes = {}",
@@ -607,15 +607,15 @@ public class FsDatasetCache {
   }
 
   public long getNumBlocksFailedToCache() {
-    return numBlocksFailedToCache.get();
+    return numBlocksFailedToCache.longValue();
   }
 
   public long getNumBlocksFailedToUncache() {
-    return numBlocksFailedToUncache.get();
+    return numBlocksFailedToUncache.longValue();
   }
 
   public long getNumBlocksCached() {
-    return numBlocksCached.get();
+    return numBlocksCached.longValue();
   }
 
   public synchronized boolean isCached(String bpid, long blockId) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index d06d3cf..d148f77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2463,7 +2463,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         success = true;
       } finally {
         if (!success) {
-          cacheManager.numBlocksFailedToCache.incrementAndGet();
+          cacheManager.numBlocksFailedToCache.increment();
         }
       }
       blockFileName = info.getBlockURI().toString();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 6b73bbd..79f039b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -28,7 +28,7 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -183,7 +183,7 @@ public class FSEditLog implements LogsPurgeable {
   
   // these are statistics counters.
   private long numTransactions;        // number of transactions
-  private final AtomicLong numTransactionsBatchedInSync = new AtomicLong();
+  private final LongAdder numTransactionsBatchedInSync = new LongAdder();
   private long totalTimeTransactions;  // total time for all transactions
   private NameNodeMetrics metrics;
 
@@ -731,7 +731,7 @@ public class FSEditLog implements LogsPurgeable {
       if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
         metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
-        numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
+        numTransactionsBatchedInSync.add(editsBatchedInSync);
       }
       
     } finally {
@@ -771,7 +771,7 @@ public class FSEditLog implements LogsPurgeable {
         .append(" Total time for transactions(ms): ")
         .append(totalTimeTransactions)
         .append(" Number of transactions batched in Syncs: ")
-        .append(numTransactionsBatchedInSync.get())
+        .append(numTransactionsBatchedInSync.longValue())
         .append(" Number of syncs: ")
         .append(editLogStream.getNumSync())
         .append(" SyncTimes(ms): ")
@@ -1404,7 +1404,7 @@ public class FSEditLog implements LogsPurgeable {
     
     numTransactions = 0;
     totalTimeTransactions = 0;
-    numTransactionsBatchedInSync.set(0L);
+    numTransactionsBatchedInSync.reset();
 
     // TODO no need to link this back to storage anymore!
     // See HDFS-2174.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
index 842c6b3..b4f479f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
@@ -113,12 +114,12 @@ class FSNamesystemLock {
    * The number of time the read lock
    * has been held longer than the threshold.
    */
-  private final AtomicLong numReadLockLongHold = new AtomicLong(0);
+  private final LongAdder numReadLockLongHold = new LongAdder();
   /**
    * The number of time the write lock
    * has been held for longer than the threshold.
    */
-  private final AtomicLong numWriteLockLongHold = new AtomicLong(0);
+  private final LongAdder numWriteLockLongHold = new LongAdder();
 
   @VisibleForTesting
   static final String OP_NAME_OTHER = "OTHER";
@@ -192,7 +193,7 @@ class FSNamesystemLock {
     final long readLockIntervalMs =
         TimeUnit.NANOSECONDS.toMillis(readLockIntervalNanos);
     if (needReport && readLockIntervalMs >= this.readLockReportingThresholdMs) {
-      numReadLockLongHold.incrementAndGet();
+      numReadLockLongHold.increment();
       String lockReportInfo = null;
       boolean done = false;
       while (!done) {
@@ -309,7 +310,7 @@ class FSNamesystemLock {
     LogAction logAction = LogThrottlingHelper.DO_NOT_LOG;
     if (needReport &&
         writeLockIntervalMs >= this.writeLockReportingThresholdMs) {
-      numWriteLockLongHold.incrementAndGet();
+      numWriteLockLongHold.increment();
       if (longestWriteLockHeldInfo.getIntervalMs() <= writeLockIntervalMs) {
         String lockReportInfo = lockReportInfoSupplier != null ? " (" +
             lockReportInfoSupplier.get() + ")" : "";
@@ -382,7 +383,7 @@ class FSNamesystemLock {
    * has been held longer than the threshold
    */
   public long getNumOfReadLockLongHold() {
-    return numReadLockLongHold.get();
+    return numReadLockLongHold.longValue();
   }
 
   /**
@@ -393,7 +394,7 @@ class FSNamesystemLock {
    * has been held longer than the threshold.
    */
   public long getNumOfWriteLockLongHold() {
-    return numWriteLockLongHold.get();
+    return numWriteLockLongHold.longValue();
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index ac3c122..c1e0dbb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -401,9 +401,9 @@ public class TestPread {
     DFSClient dfsClient = fileSys.getClient();
     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
     // Metrics instance is static, so we need to reset counts from prior tests.
-    metrics.hedgedReadOps.set(0);
-    metrics.hedgedReadOpsWin.set(0);
-    metrics.hedgedReadOpsInCurThread.set(0);
+    metrics.hedgedReadOps.reset();
+    metrics.hedgedReadOpsWin.reset();
+    metrics.hedgedReadOpsInCurThread.reset();
 
     try {
       Path file1 = new Path("hedgedReadMaxOut.dat");
@@ -590,7 +590,7 @@ public class TestPread {
     String filename = "/hedgedReadMaxOut.dat";
     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
     // Metrics instance is static, so we need to reset counts from prior tests.
-    metrics.hedgedReadOps.set(0);
+    metrics.hedgedReadOps.reset();
     try {
       Path file = new Path(filename);
       output = fileSys.create(file, (short) 2);

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org