You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/04/02 20:09:43 UTC
[05/50] [abbrv] hadoop git commit: HDFS-8008. Support client-side
back off when the datanodes are congested. Contributed by Haohui Mai.
HDFS-8008. Support client-side back off when the datanodes are congested. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ccf4fbf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ccf4fbf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ccf4fbf
Branch: refs/heads/HDFS-7285
Commit: 6ccf4fbf8a8374c289370f67b26ac05abad30ebc
Parents: 75cb1d4
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Apr 1 16:54:46 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Apr 1 16:54:46 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DataStreamer.java | 63 ++++++++++++++++++++
.../hdfs/protocol/datatransfer/PipelineAck.java | 4 ++
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 42 +++++++++++++
4 files changed, 112 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ccf4fbf/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1d9e200..34c0556 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -868,6 +868,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7742. Favoring decommissioning node for replication can cause a block
to stay underreplicated for long periods (Nathan Roberts via kihwal)
+ HDFS-8008. Support client-side back off when the datanodes are congested.
+ (wheat9)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ccf4fbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 9c437ba..6ff4c24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -218,6 +218,13 @@ class DataStreamer extends Daemon {
private boolean failPacket = false;
private final long dfsclientSlowLogThresholdMs;
private long artificialSlowdown = 0;
+ // List of congested data nodes. The stream will back off if the DataNodes
+ // are congested
+ private final ArrayList<DatanodeInfo> congestedNodes = new ArrayList<>();
+ private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
+ private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
+ CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
+ private int lastCongestionBackoffTime;
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
@@ -386,6 +393,11 @@ class DataStreamer extends Daemon {
one = createHeartbeatPacket();
assert one != null;
} else {
+ try {
+ backOffIfNecessary();
+ } catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
+ }
one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents();
if (parents.length > 0) {
@@ -815,9 +827,14 @@ class DataStreamer extends Daemon {
long seqno = ack.getSeqno();
// processes response status from datanodes.
+ ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = PipelineAck.getStatusFromHeader(ack
.getHeaderFlag(i));
+ if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
+ PipelineAck.ECN.CONGESTED) {
+ congestedNodesFromAck.add(targets[i]);
+ }
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
@@ -839,6 +856,18 @@ class DataStreamer extends Daemon {
}
}
+ if (!congestedNodesFromAck.isEmpty()) {
+ synchronized (congestedNodes) {
+ congestedNodes.clear();
+ congestedNodes.addAll(congestedNodesFromAck);
+ }
+ } else {
+ synchronized (congestedNodes) {
+ congestedNodes.clear();
+ lastCongestionBackoffTime = 0;
+ }
+ }
+
assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
@@ -1544,6 +1573,40 @@ class DataStreamer extends Daemon {
}
/**
+ * This function sleeps for a certain amount of time when the writing
+ * pipeline is congested. The function calculates the time based on a
+ * decorrelated filter.
+ *
+ * @see
+ * <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html">
+ * http://www.awsarchitectureblog.com/2015/03/backoff.html</a>
+ */
+ private void backOffIfNecessary() throws InterruptedException {
+ int t = 0;
+ synchronized (congestedNodes) {
+ if (!congestedNodes.isEmpty()) {
+ StringBuilder sb = new StringBuilder("DataNode");
+ for (DatanodeInfo i : congestedNodes) {
+ sb.append(' ').append(i);
+ }
+ int range = Math.abs(lastCongestionBackoffTime * 3 -
+ CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
+ int base = Math.min(lastCongestionBackoffTime * 3,
+ CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
+ t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
+ (int)(base + Math.random() * range));
+ lastCongestionBackoffTime = t;
+ sb.append(" are congested. Backing off for ").append(t).append(" ms");
+ DFSClient.LOG.info(sb.toString());
+ congestedNodes.clear();
+ }
+ }
+ if (t != 0) {
+ Thread.sleep(t);
+ }
+ }
+
+ /**
* get the block this streamer is writing to
*
* @return the block this streamer is writing to
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ccf4fbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index 9bd4115..a811f39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -257,6 +257,10 @@ public class PipelineAck {
return StatusFormat.getStatus(header);
}
+ public static ECN getECNFromHeader(int header) {
+ return StatusFormat.getECN(header);
+ }
+
public static int setStatusForHeader(int old, Status status) {
return StatusFormat.setStatus(old, status);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ccf4fbf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index b47e7f1..a410e74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -17,20 +17,31 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
public class TestDFSOutputStream {
static MiniDFSCluster cluster;
@@ -100,6 +111,37 @@ public class TestDFSOutputStream {
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
}
+ @Test
+ public void testCongestionBackoff() throws IOException {
+ DFSClient.Conf dfsClientConf = mock(DFSClient.Conf.class);
+ DFSClient client = mock(DFSClient.class);
+ when(client.getConf()).thenReturn(dfsClientConf);
+ client.clientRunning = true;
+ DataStreamer stream = new DataStreamer(
+ mock(HdfsFileStatus.class),
+ mock(ExtendedBlock.class),
+ client,
+ "foo", null, null, null, null);
+
+ DataOutputStream blockStream = mock(DataOutputStream.class);
+ doThrow(new IOException()).when(blockStream).flush();
+ Whitebox.setInternalState(stream, "blockStream", blockStream);
+ Whitebox.setInternalState(stream, "stage",
+ BlockConstructionStage.PIPELINE_CLOSE);
+ @SuppressWarnings("unchecked")
+ LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
+ Whitebox.getInternalState(stream, "dataQueue");
+ @SuppressWarnings("unchecked")
+ ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
+ Whitebox.getInternalState(stream, "congestedNodes");
+ congestedNodes.add(mock(DatanodeInfo.class));
+ DFSPacket packet = mock(DFSPacket.class);
+ when(packet.getTraceParents()).thenReturn(new long[] {});
+ dataQueue.add(packet);
+ stream.run();
+ Assert.assertTrue(congestedNodes.isEmpty());
+ }
+
@AfterClass
public static void tearDown() {
cluster.shutdown();