You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 08:59:55 UTC

[21/50] hadoop git commit: HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. Contributed by Walter Su.

HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/193d27de
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/193d27de
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/193d27de

Branch: refs/heads/yarn-2877
Commit: 193d27de0a5d23a61cabd41162ebc3292d8526d1
Parents: cf32615
Author: Kihwal Lee <ki...@apache.org>
Authored: Mon Feb 8 12:16:05 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Mon Feb 8 12:16:29 2016 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 42 +++++++-------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |  2 +-
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  | 22 +++++++
 .../TestClientProtocolForPipelineRecovery.java  | 60 +++++++++++++++++++-
 .../apache/hadoop/hdfs/TestRollingUpgrade.java  |  4 +-
 6 files changed, 109 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index bac4d12..e3843de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
@@ -368,9 +369,8 @@ class DataStreamer extends Daemon {
 
   /** Nodes have been used in the pipeline before and have failed. */
   private final List<DatanodeInfo> failed = new ArrayList<>();
-  /** The last ack sequence number before pipeline failure. */
-  private long lastAckedSeqnoBeforeFailure = -1;
-  private int pipelineRecoveryCount = 0;
+  /** The times have retried to recover pipeline, for the same packet. */
+  private volatile int pipelineRecoveryCount = 0;
   /** Has the current block been hflushed? */
   private boolean isHflushed = false;
   /** Append on an existing block? */
@@ -1040,6 +1040,7 @@ class DataStreamer extends Daemon {
               one.setTraceScope(null);
             }
             lastAckedSeqno = seqno;
+            pipelineRecoveryCount = 0;
             ackQueue.removeFirst();
             dataQueue.notifyAll();
 
@@ -1101,22 +1102,16 @@ class DataStreamer extends Daemon {
       ackQueue.clear();
     }
 
-    // Record the new pipeline failure recovery.
-    if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
-      lastAckedSeqnoBeforeFailure = lastAckedSeqno;
-      pipelineRecoveryCount = 1;
-    } else {
-      // If we had to recover the pipeline five times in a row for the
-      // same packet, this client likely has corrupt data or corrupting
-      // during transmission.
-      if (++pipelineRecoveryCount > 5) {
-        LOG.warn("Error recovering pipeline for writing " +
-            block + ". Already retried 5 times for the same packet.");
-        lastException.set(new IOException("Failing write. Tried pipeline " +
-            "recovery 5 times without success."));
-        streamerClosed = true;
-        return false;
-      }
+    // If we had to recover the pipeline five times in a row for the
+    // same packet, this client likely has corrupt data or corrupting
+    // during transmission.
+    if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
+      LOG.warn("Error recovering pipeline for writing " +
+          block + ". Already retried 5 times for the same packet.");
+      lastException.set(new IOException("Failing write. Tried pipeline " +
+          "recovery 5 times without success."));
+      streamerClosed = true;
+      return false;
     }
 
     setupPipelineForAppendOrRecovery();
@@ -1144,6 +1139,7 @@ class DataStreamer extends Daemon {
           assert endOfBlockPacket.isLastPacketInBlock();
           assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
           lastAckedSeqno = endOfBlockPacket.getSeqno();
+          pipelineRecoveryCount = 0;
           dataQueue.notifyAll();
         }
         endBlock();
@@ -1914,6 +1910,14 @@ class DataStreamer extends Daemon {
     return streamerClosed;
   }
 
+  /**
+   * @return The times have retried to recover pipeline, for the same packet.
+   */
+  @VisibleForTesting
+  int getPipelineRecoveryCount() {
+    return pipelineRecoveryCount;
+  }
+
   void closeSocket() throws IOException {
     if (s != null) {
       s.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6c448eb..07f01ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2789,6 +2789,9 @@ Release 2.7.3 - UNRELEASED
     HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse
     ObjectMapper. (Akira AJISAKA via wheat9)
 
+    HDFS-9752. Permanent write failures may happen to slow writers during
+    datanode rolling upgrades (Walter Su via kihwal)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b2425bf..480465c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2921,7 +2921,7 @@ public class DataNode extends ReconfigurableBase
 
     // Asynchronously start the shutdown process so that the rpc response can be
     // sent back.
-    Thread shutdownThread = new Thread() {
+    Thread shutdownThread = new Thread("Async datanode shutdown thread") {
       @Override public void run() {
         if (!shutdownForUpgrade) {
           // Delay the shutdown a bit if not doing for restart.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 63561fe..2eff12e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -2187,6 +2187,28 @@ public class MiniDFSCluster {
     return stopDataNode(node);
   }
 
+  /*
+   * Shutdown a particular datanode
+   * @param i node index
+   * @return null if the node index is out of range, else the properties of the
+   * removed node
+   */
+  public synchronized DataNodeProperties stopDataNodeForUpgrade(int i)
+      throws IOException {
+    if (i < 0 || i >= dataNodes.size()) {
+      return null;
+    }
+    DataNodeProperties dnprop = dataNodes.remove(i);
+    DataNode dn = dnprop.datanode;
+    LOG.info("MiniDFSCluster Stopping DataNode " +
+        dn.getDisplayName() +
+        " from a total of " + (dataNodes.size() + 1) +
+        " datanodes.");
+    dn.shutdownDatanode(true);
+    numDataNodes--;
+    return dnprop;
+  }
+
   /**
    * Restart a datanode
    * @param dnprop datanode's property

http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 22009fd..d7aa79a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery {
       final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args1));
       // Wait long enough to receive an OOB ack before closing the file.
-      Thread.sleep(4000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       // Retart the datanode 
       cluster.restartDataNode(0, true);
       // The following forces a data packet and end of block packets to be sent. 
@@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery {
       // issue shutdown to the datanode.
       final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args1));
-      Thread.sleep(4000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       // This should succeed without restarting the node. The restart will
       // expire and regular pipeline recovery will kick in. 
       out.close();
@@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery {
       // issue shutdown to the datanode.
       final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args2));
-      Thread.sleep(4000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       try {
         // close should fail
         out.close();
@@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery {
       }
     }
   }
+
+  /**
+   *  HDFS-9752. The client keeps sending heartbeat packets during datanode
+   *  rolling upgrades. The client should be able to retry pipeline recovery
+   *  more times than the default.
+   *  (in a row for the same packet, including the heartbeat packet)
+   *  (See{@link DataStreamer#pipelineRecoveryCount})
+   */
+  @Test(timeout = 60000)
+  public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L);
+      final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file).
+          getWrappedStream());
+      out.write(1);
+      out.hflush();
+
+      final long oldGs = out.getBlock().getGenerationStamp();
+      MiniDFSCluster.DataNodeProperties dnProps =
+          cluster.stopDataNodeForUpgrade(0);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
+      cluster.restartDataNode(dnProps, true);
+      cluster.waitActive();
+
+      // wait pipeline to be recovered
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return out.getBlock().getGenerationStamp() > oldGs;
+        }
+      }, 100, 10000);
+      Assert.assertEquals("The pipeline recovery count shouldn't increase",
+          0, out.getStreamer().getPipelineRecoveryCount());
+      out.write(1);
+      out.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d27de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
index b3279ed..b356fb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
 import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -407,7 +408,8 @@ public class TestRollingUpgrade {
       runCmd(dfsadmin, true, args2);
 
       // the datanode should be down.
-      Thread.sleep(2000);
+      GenericTestUtils.waitForThreadTermination(
+          "Async datanode shutdown thread", 100, 10000);
       Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
 
       // ping should fail.