You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/02/11 18:10:06 UTC

[hadoop] branch trunk updated: HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ceb1b7  HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.
0ceb1b7 is described below

commit 0ceb1b70f3200873fe1f40c264b91051b4a3d721
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Mon Feb 11 10:09:44 2019 -0800

    HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.
---
 .../hadoop/hdfs/server/datanode/BlockReceiver.java | 40 +++++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index cb1f73d..9509184 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
@@ -142,7 +143,7 @@ class BlockReceiver implements Closeable {
   private long maxWriteToDiskMs = 0;
   
   private boolean pinning;
-  private long lastSentTime;
+  private final AtomicLong lastSentTime = new AtomicLong(0L);
   private long maxSendIdleTime;
 
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
@@ -182,7 +183,7 @@ class BlockReceiver implements Closeable {
           || stage == BlockConstructionStage.TRANSFER_FINALIZED;
 
       this.pinning = pinning;
-      this.lastSentTime = Time.monotonicNow();
+      this.lastSentTime.set(Time.monotonicNow());
       // Downstream will timeout in readTimeout on receiving the next packet.
       // If there is no data traffic, a heartbeat packet is sent at
       // the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
@@ -379,23 +380,28 @@ class BlockReceiver implements Closeable {
     }
   }
 
-  synchronized void setLastSentTime(long sentTime) {
-    lastSentTime = sentTime;
-  }
-
   /**
-   * It can return false if
-   * - upstream did not send packet for a long time
-   * - a packet was received but got stuck in local disk I/O.
-   * - a packet was received but got stuck on send to mirror.
+   * Check if a packet was sent within an acceptable period of time.
+   *
+   * Some example of when this method may return false:
+   * <ul>
+   * <li>Upstream did not send packet for a long time</li>
+   * <li>Packet was received but got stuck in local disk I/O</li>
+   * <li>Packet was received but got stuck on send to mirror</li>
+   * </ul>
+   *
+   * @return true if packet was sent within an acceptable period of time;
+   *         otherwise false.
    */
-  synchronized boolean packetSentInTime() {
-    long diff = Time.monotonicNow() - lastSentTime;
-    if (diff > maxSendIdleTime) {
-      LOG.info("A packet was last sent " + diff + " milliseconds ago.");
-      return false;
+  boolean packetSentInTime() {
+    final long diff = Time.monotonicNow() - this.lastSentTime.get();
+    final boolean allowedIdleTime = (diff <= this.maxSendIdleTime);
+    LOG.debug("A packet was last sent {}ms ago.", diff);
+    if (!allowedIdleTime) {
+      LOG.warn("A packet was last sent {}ms ago. Maximum idle time: {}ms.",
+          diff, this.maxSendIdleTime);
     }
-    return true;
+    return allowedIdleTime;
   }
 
   /**
@@ -589,7 +595,7 @@ class BlockReceiver implements Closeable {
         packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
         long now = Time.monotonicNow();
-        setLastSentTime(now);
+        this.lastSentTime.set(now);
         long duration = now - begin;
         DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
             mirrorAddr,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org