You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2014/08/13 20:07:57 UTC

svn commit: r1617785 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/proto/ src/test/java/org/apache/hadoop/h...

Author: vinayakumarb
Date: Wed Aug 13 18:07:57 2014
New Revision: 1617785

URL: http://svn.apache.org/r1617785
Log:
Merged revision(s) 1617784 from hadoop/common/trunk:
HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (Contributed by Vinayakumar B.)

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617785&r1=1617784&r2=1617785&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 13 18:07:57 2014
@@ -245,6 +245,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
     block replica (Arpit Agarwal)
 
+    HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate
+    responses to Balancer (vinayakumarb)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617785&r1=1617784&r2=1617785&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Wed Aug 13 18:07:57 2014
@@ -87,8 +87,6 @@ public class Dispatcher {
 
   private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
   private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
-  private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
-                                                                     // minutes
 
   private final NameNodeConnector nnc;
   private final SaslDataTransferClient saslClient;
@@ -278,13 +276,6 @@ public class Dispatcher {
         sock.connect(
             NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
-        /*
-         * Unfortunately we don't have a good way to know if the Datanode is
-         * taking a really long time to move a block, OR something has gone
-         * wrong and it's never going to finish. To deal with this scenario, we
-         * set a long timeout (20 minutes) to avoid hanging indefinitely.
-         */
-        sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
 
         sock.setKeepAlive(true);
 
@@ -341,8 +332,12 @@ public class Dispatcher {
 
     /** Receive a block copy response from the input stream */
     private void receiveResponse(DataInputStream in) throws IOException {
-      BlockOpResponseProto response = BlockOpResponseProto
-          .parseFrom(vintPrefixed(in));
+      BlockOpResponseProto response =
+          BlockOpResponseProto.parseFrom(vintPrefixed(in));
+      while (response.getStatus() == Status.IN_PROGRESS) {
+        // read intermediate responses
+        response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+      }
       if (response.getStatus() != Status.SUCCESS) {
         if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
           throw new IOException("block move failed due to access token error");

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1617785&r1=1617784&r2=1617785&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 13 18:07:57 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -123,6 +124,14 @@ class BlockReceiver implements Closeable
   private boolean syncOnClose;
   private long restartBudget;
 
+  /**
+   * for replaceBlock response
+   */
+  private final long responseInterval;
+  private long lastResponseTime = 0;
+  private boolean isReplaceBlock = false;
+  private DataOutputStream replyOut = null;
+
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
       final DataInputStream in,
       final String inAddr, final String myAddr,
@@ -144,6 +153,9 @@ class BlockReceiver implements Closeable
       this.isClient = !this.isDatanode;
       this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
       this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
+      // For replaceBlock() calls response should be sent to avoid socketTimeout
+      // at clients. So sending with the interval of 0.5 * socketTimeout
+      this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
       //for datanode, we have
       //1: clientName.length() == 0, and
       //2: stage == null or PIPELINE_SETUP_CREATE
@@ -651,6 +663,20 @@ class BlockReceiver implements Closeable
           lastPacketInBlock, offsetInBlock, Status.SUCCESS);
     }
 
+    /*
+     * Send in-progress responses for the replaceBlock() calls back to caller to
+     * avoid timeouts due to balancer throttling. HDFS-6247
+     */
+    if (isReplaceBlock
+        && (Time.monotonicNow() - lastResponseTime > responseInterval)) {
+      BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+          .setStatus(Status.IN_PROGRESS);
+      response.build().writeDelimitedTo(replyOut);
+      replyOut.flush();
+
+      lastResponseTime = Time.monotonicNow();
+    }
+
     if (throttler != null) { // throttle I/O
       throttler.throttle(len);
     }
@@ -718,7 +744,8 @@ class BlockReceiver implements Closeable
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      DatanodeInfo[] downstreams) throws IOException {
+      DatanodeInfo[] downstreams,
+      boolean isReplaceBlock) throws IOException {
 
       syncOnClose = datanode.getDnConf().syncOnClose;
       boolean responderClosed = false;
@@ -726,6 +753,9 @@ class BlockReceiver implements Closeable
       mirrorAddr = mirrAddr;
       throttler = throttlerArg;
 
+      this.replyOut = replyOut;
+      this.isReplaceBlock = isReplaceBlock;
+
     try {
       if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1617785&r1=1617784&r2=1617785&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 13 18:07:57 2014
@@ -708,7 +708,7 @@ class DataXceiver extends Receiver imple
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets);
+            mirrorAddr, null, targets, false);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -983,7 +983,7 @@ class DataXceiver extends Receiver imple
     String errMsg = null;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
-    
+    DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     try {
       // get the output stream to the proxy
       final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver imple
           CachingStrategy.newDropBehind());
 
       // receive a block
-      blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, null);
+      blockReceiver.receiveBlock(null, null, replyOut, null, 
+          dataXceiverServer.balanceThrottler, null, true);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(
@@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
+      IOUtils.closeStream(replyOut);
     }
 
     //update metrics

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1617785&r1=1617784&r2=1617785&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Wed Aug 13 18:07:57 2014
@@ -207,6 +207,7 @@ enum Status {
   OOB_RESERVED1 = 9;          // Reserved
   OOB_RESERVED2 = 10;         // Reserved
   OOB_RESERVED3 = 11;         // Reserved
+  IN_PROGRESS = 12;
 }
 
 message PipelineAckProto {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1617785&r1=1617784&r2=1617785&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed Aug 13 18:07:57 2014
@@ -272,8 +272,10 @@ public class TestBlockReplacement {
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());
 
-    BlockOpResponseProto proto =
-      BlockOpResponseProto.parseDelimitedFrom(reply);
+    BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+    while (proto.getStatus() == Status.IN_PROGRESS) {
+      proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+    }
     return proto.getStatus() == Status.SUCCESS;
   }