You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/02/11 18:10:06 UTC
[hadoop] branch trunk updated: HDFS-14260. Replace synchronized
method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ceb1b7 HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.
0ceb1b7 is described below
commit 0ceb1b70f3200873fe1f40c264b91051b4a3d721
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Mon Feb 11 10:09:44 2019 -0800
HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.
---
.../hadoop/hdfs/server/datanode/BlockReceiver.java | 40 +++++++++++++---------
1 file changed, 23 insertions(+), 17 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index cb1f73d..9509184 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
@@ -142,7 +143,7 @@ class BlockReceiver implements Closeable {
private long maxWriteToDiskMs = 0;
private boolean pinning;
- private long lastSentTime;
+ private final AtomicLong lastSentTime = new AtomicLong(0L);
private long maxSendIdleTime;
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
@@ -182,7 +183,7 @@ class BlockReceiver implements Closeable {
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
this.pinning = pinning;
- this.lastSentTime = Time.monotonicNow();
+ this.lastSentTime.set(Time.monotonicNow());
// Downstream will timeout in readTimeout on receiving the next packet.
// If there is no data traffic, a heartbeat packet is sent at
// the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
@@ -379,23 +380,28 @@ class BlockReceiver implements Closeable {
}
}
- synchronized void setLastSentTime(long sentTime) {
- lastSentTime = sentTime;
- }
-
/**
- * It can return false if
- * - upstream did not send packet for a long time
- * - a packet was received but got stuck in local disk I/O.
- * - a packet was received but got stuck on send to mirror.
+ * Check if a packet was sent within an acceptable period of time.
+ *
+ * Some example of when this method may return false:
+ * <ul>
+ * <li>Upstream did not send packet for a long time</li>
+ * <li>Packet was received but got stuck in local disk I/O</li>
+ * <li>Packet was received but got stuck on send to mirror</li>
+ * </ul>
+ *
+ * @return true if packet was sent within an acceptable period of time;
+ * otherwise false.
*/
- synchronized boolean packetSentInTime() {
- long diff = Time.monotonicNow() - lastSentTime;
- if (diff > maxSendIdleTime) {
- LOG.info("A packet was last sent " + diff + " milliseconds ago.");
- return false;
+ boolean packetSentInTime() {
+ final long diff = Time.monotonicNow() - this.lastSentTime.get();
+ final boolean allowedIdleTime = (diff <= this.maxSendIdleTime);
+ LOG.debug("A packet was last sent {}ms ago.", diff);
+ if (!allowedIdleTime) {
+ LOG.warn("A packet was last sent {}ms ago. Maximum idle time: {}ms.",
+ diff, this.maxSendIdleTime);
}
- return true;
+ return allowedIdleTime;
}
/**
@@ -589,7 +595,7 @@ class BlockReceiver implements Closeable {
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
long now = Time.monotonicNow();
- setLastSentTime(now);
+ this.lastSentTime.set(now);
long duration = now - begin;
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
mirrorAddr,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org