You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/02/02 19:19:23 UTC
hadoop git commit: HDFS-9178. Slow datanode I/O can cause a wrong
node to be marked bad. Contributed by Kihwal Lee.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6 572b5872f -> 30626ad35
HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad. Contributed by Kihwal Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/30626ad3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/30626ad3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/30626ad3
Branch: refs/heads/branch-2.6
Commit: 30626ad35f636d8d3ff4b00be42f3d1e7c35a55a
Parents: 572b587
Author: Junping Du <ju...@apache.org>
Authored: Tue Feb 2 10:29:31 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Tue Feb 2 10:29:31 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BlockReceiver.java | 52 +++++++++++++++-
.../server/datanode/DataNodeFaultInjector.java | 8 +++
.../TestClientProtocolForPipelineRecovery.java | 63 ++++++++++++++++++++
4 files changed, 124 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30626ad3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 612da31..ae6bb12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -43,6 +43,9 @@ Release 2.6.4 - UNRELEASED
HDFS-4660. Block corruption can happen during pipeline recovery (kihwal)
+ HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad.
+ (Kihwal Lee)
+
Release 2.6.3 - 2015-12-17
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30626ad3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 4b6668b..32e40d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -24,6 +24,7 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.FileWriter;
@@ -130,6 +131,8 @@ class BlockReceiver implements Closeable {
private long lastResponseTime = 0;
private boolean isReplaceBlock = false;
private DataOutputStream replyOut = null;
+ private long lastSentTime;
+ private long maxSendIdleTime;
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
@@ -155,7 +158,8 @@ class BlockReceiver implements Closeable {
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
// For replaceBlock() calls response should be sent to avoid socketTimeout
// at clients. So sending with the interval of 0.5 * socketTimeout
- this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
+ final long readTimeout = datanode.getDnConf().socketTimeout;
+ this.responseInterval = (long) (readTimeout * 0.5);
//for datanode, we have
//1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE
@@ -163,6 +167,12 @@ class BlockReceiver implements Closeable {
this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
+ this.lastSentTime = Time.monotonicNow();
+ // Downstream will timeout in readTimeout on receiving the next packet.
+ // If there is no data traffic, a heartbeat packet is sent at
+ // the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
+ // the threshold for detecting congestion.
+ this.maxSendIdleTime = (long) (readTimeout * 0.9);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": " + block
+ "\n isClient =" + isClient + ", clientname=" + clientname
@@ -346,6 +356,25 @@ class BlockReceiver implements Closeable {
}
}
+ synchronized void setLastSentTime(long sentTime) {
+ lastSentTime = sentTime;
+ }
+
+ /**
+ * It can return false if
+ * - upstream did not send packet for a long time
+ * - a packet was received but got stuck in local disk I/O.
+ * - a packet was received but got stuck on send to mirror.
+ */
+ synchronized boolean packetSentInTime() {
+ long diff = Time.monotonicNow() - lastSentTime;
+ if (diff > maxSendIdleTime) {
+ LOG.info("A packet was last sent " + diff + " milliseconds ago.");
+ return false;
+ }
+ return true;
+ }
+
/**
* Flush block data and metadata files to disk.
* @throws IOException
@@ -509,13 +538,21 @@ class BlockReceiver implements Closeable {
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
+ // Drop heartbeat for testing.
+ if (seqno < 0 && len == 0 &&
+ DataNodeFaultInjector.get().dropHeartbeatPacket()) {
+ return 0;
+ }
+
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
long begin = Time.monotonicNow();
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
- long duration = Time.monotonicNow() - begin;
+ long now = Time.monotonicNow();
+ setLastSentTime(now);
+ long duration = now - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -1277,6 +1314,17 @@ class BlockReceiver implements Closeable {
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
+ } else if (ioe instanceof EOFException && !packetSentInTime()) {
+ // The downstream error was caused by upstream including this
+ // node not sending packet in time. Let the upstream determine
+ // who is at fault. If the immediate upstream node thinks it
+ // has sent a packet in time, this node will be reported as bad.
+ // Otherwise, the upstream node will propagate the error up by
+ // closing the connection.
+ LOG.warn("The downstream error might be due to congestion in " +
+ "upstream including this node. Propagating the error: ",
+ ioe);
+ throw ioe;
} else {
// continue to run even if can not read from mirror
// notify client of the error
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30626ad3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index fb81763..e66d66b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -38,6 +38,14 @@ public class DataNodeFaultInjector {
public void getHdfsBlocksMetadata() {}
+ public static void set(DataNodeFaultInjector injector) {
+ instance = injector;
+ }
+
+ public boolean dropHeartbeatPacket() {
+ return false;
+ }
+
public void sendShortCircuitShmResponse() throws IOException {}
public void noRegistration() throws IOException { }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30626ad3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 04853bd..d71bc4d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -21,10 +21,13 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -162,6 +165,66 @@ public class TestClientProtocolForPipelineRecovery {
}
}
+ @Test
+ public void testPacketTransmissionDelay() throws Exception {
+ // Make the first datanode to not relay heartbeat packet.
+ DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
+ @Override
+ public boolean dropHeartbeatPacket() {
+ return true;
+ }
+ };
+ DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
+ DataNodeFaultInjector.set(dnFaultInjector);
+
+ // Setting the timeout to be 3 seconds. Normally heartbeat packet
+ // would be sent every 1.5 seconds if there is no data traffic.
+ Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
+ MiniDFSCluster cluster = null;
+
+ try {
+ int numDataNodes = 2;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
+ out.write(0x31);
+ out.hflush();
+
+ DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
+
+ // original pipeline
+ DatanodeInfo[] orgNodes = dfsOut.getPipeline();
+
+ // Cause the second datanode to timeout on reading packet
+ Thread.sleep(3500);
+ out.write(0x32);
+ out.hflush();
+
+ // new pipeline
+ DatanodeInfo[] newNodes = dfsOut.getPipeline();
+ out.close();
+
+ boolean contains = false;
+ for (int i = 0; i < newNodes.length; i++) {
+ if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) {
+ throw new IOException("The first datanode should have been replaced.");
+ }
+ if (orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) {
+ contains = true;
+ }
+ }
+ Assert.assertTrue(contains);
+ } finally {
+ DataNodeFaultInjector.set(oldDnInjector);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
/**
* Test recovery on restart OOB message. It also tests the delivery of
* OOB ack originating from the primary datanode. Since there is only