You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 08:59:55 UTC
[21/50] hadoop git commit: HDFS-9752. Permanent write failures may
happen to slow writers during datanode rolling upgrades. Contributed by
Walter Su.
HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. Contributed by Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/193d27de
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/193d27de
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/193d27de
Branch: refs/heads/yarn-2877
Commit: 193d27de0a5d23a61cabd41162ebc3292d8526d1
Parents: cf32615
Author: Kihwal Lee <ki...@apache.org>
Authored: Mon Feb 8 12:16:05 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Mon Feb 8 12:16:29 2016 -0600
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DataStreamer.java | 42 +++++++-------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 22 +++++++
.../TestClientProtocolForPipelineRecovery.java | 60 +++++++++++++++++++-
.../apache/hadoop/hdfs/TestRollingUpgrade.java | 4 +-
6 files changed, 109 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index bac4d12..e3843de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
@@ -368,9 +369,8 @@ class DataStreamer extends Daemon {
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
- /** The last ack sequence number before pipeline failure. */
- private long lastAckedSeqnoBeforeFailure = -1;
- private int pipelineRecoveryCount = 0;
+ /** The times have retried to recover pipeline, for the same packet. */
+ private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
private boolean isHflushed = false;
/** Append on an existing block? */
@@ -1040,6 +1040,7 @@ class DataStreamer extends Daemon {
one.setTraceScope(null);
}
lastAckedSeqno = seqno;
+ pipelineRecoveryCount = 0;
ackQueue.removeFirst();
dataQueue.notifyAll();
@@ -1101,22 +1102,16 @@ class DataStreamer extends Daemon {
ackQueue.clear();
}
- // Record the new pipeline failure recovery.
- if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
- lastAckedSeqnoBeforeFailure = lastAckedSeqno;
- pipelineRecoveryCount = 1;
- } else {
- // If we had to recover the pipeline five times in a row for the
- // same packet, this client likely has corrupt data or corrupting
- // during transmission.
- if (++pipelineRecoveryCount > 5) {
- LOG.warn("Error recovering pipeline for writing " +
- block + ". Already retried 5 times for the same packet.");
- lastException.set(new IOException("Failing write. Tried pipeline " +
- "recovery 5 times without success."));
- streamerClosed = true;
- return false;
- }
+ // If we had to recover the pipeline five times in a row for the
+ // same packet, this client likely has corrupt data or corrupting
+ // during transmission.
+ if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
+ LOG.warn("Error recovering pipeline for writing " +
+ block + ". Already retried 5 times for the same packet.");
+ lastException.set(new IOException("Failing write. Tried pipeline " +
+ "recovery 5 times without success."));
+ streamerClosed = true;
+ return false;
}
setupPipelineForAppendOrRecovery();
@@ -1144,6 +1139,7 @@ class DataStreamer extends Daemon {
assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno();
+ pipelineRecoveryCount = 0;
dataQueue.notifyAll();
}
endBlock();
@@ -1914,6 +1910,14 @@ class DataStreamer extends Daemon {
return streamerClosed;
}
+ /**
+ * @return The times have retried to recover pipeline, for the same packet.
+ */
+ @VisibleForTesting
+ int getPipelineRecoveryCount() {
+ return pipelineRecoveryCount;
+ }
+
void closeSocket() throws IOException {
if (s != null) {
s.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6c448eb..07f01ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2789,6 +2789,9 @@ Release 2.7.3 - UNRELEASED
HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse
ObjectMapper. (Akira AJISAKA via wheat9)
+ HDFS-9752. Permanent write failures may happen to slow writers during
+ datanode rolling upgrades (Walter Su via kihwal)
+
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b2425bf..480465c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2921,7 +2921,7 @@ public class DataNode extends ReconfigurableBase
// Asynchronously start the shutdown process so that the rpc response can be
// sent back.
- Thread shutdownThread = new Thread() {
+ Thread shutdownThread = new Thread("Async datanode shutdown thread") {
@Override public void run() {
if (!shutdownForUpgrade) {
// Delay the shutdown a bit if not doing for restart.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 63561fe..2eff12e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -2187,6 +2187,28 @@ public class MiniDFSCluster {
return stopDataNode(node);
}
+ /*
+ * Shutdown a particular datanode
+ * @param i node index
+ * @return null if the node index is out of range, else the properties of the
+ * removed node
+ */
+ public synchronized DataNodeProperties stopDataNodeForUpgrade(int i)
+ throws IOException {
+ if (i < 0 || i >= dataNodes.size()) {
+ return null;
+ }
+ DataNodeProperties dnprop = dataNodes.remove(i);
+ DataNode dn = dnprop.datanode;
+ LOG.info("MiniDFSCluster Stopping DataNode " +
+ dn.getDisplayName() +
+ " from a total of " + (dataNodes.size() + 1) +
+ " datanodes.");
+ dn.shutdownDatanode(true);
+ numDataNodes--;
+ return dnprop;
+ }
+
/**
* Restart a datanode
* @param dnprop datanode's property
http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 22009fd..d7aa79a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
+import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery {
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
// Wait long enough to receive an OOB ack before closing the file.
- Thread.sleep(4000);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
// Retart the datanode
cluster.restartDataNode(0, true);
// The following forces a data packet and end of block packets to be sent.
@@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
- Thread.sleep(4000);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
// This should succeed without restarting the node. The restart will
// expire and regular pipeline recovery will kick in.
out.close();
@@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args2));
- Thread.sleep(4000);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
try {
// close should fail
out.close();
@@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery {
}
}
}
+
+ /**
+ * HDFS-9752. The client keeps sending heartbeat packets during datanode
+ * rolling upgrades. The client should be able to retry pipeline recovery
+ * more times than the default.
+ * (in a row for the same packet, including the heartbeat packet)
+ * (See{@link DataStreamer#pipelineRecoveryCount})
+ */
+ @Test(timeout = 60000)
+ public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+
+ Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L);
+ final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+
+ final long oldGs = out.getBlock().getGenerationStamp();
+ MiniDFSCluster.DataNodeProperties dnProps =
+ cluster.stopDataNodeForUpgrade(0);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
+ cluster.restartDataNode(dnProps, true);
+ cluster.waitActive();
+
+ // wait pipeline to be recovered
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return out.getBlock().getGenerationStamp() > oldGs;
+ }
+ }, 100, 10000);
+ Assert.assertEquals("The pipeline recovery count shouldn't increase",
+ 0, out.getStreamer().getPipelineRecoveryCount());
+ out.write(1);
+ out.close();
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
index b3279ed..b356fb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -407,7 +408,8 @@ public class TestRollingUpgrade {
runCmd(dfsadmin, true, args2);
// the datanode should be down.
- Thread.sleep(2000);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
// ping should fail.