You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2014/08/13 20:36:51 UTC

svn commit: r1617794 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/proto/ src/test/java/org/apache/hadoop/hdfs/server/d...

Author: vinayakumarb
Date: Wed Aug 13 18:36:51 2014
New Revision: 1617794

URL: http://svn.apache.org/r1617794
Log:
Reverted
Merged revision(s) 1617784 from hadoop/common/trunk:
HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (Contributed by Vinayakumar B.)
........

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617794&r1=1617793&r2=1617794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 13 18:36:51 2014
@@ -500,9 +500,6 @@ Release 2.6.0 - UNRELEASED
     HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
     block replica (Arpit Agarwal)
 
-    HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate
-    responses to Balancer (vinayakumarb)
-
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617794&r1=1617793&r2=1617794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Wed Aug 13 18:36:51 2014
@@ -87,6 +87,8 @@ public class Dispatcher {
 
   private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
   private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
+  private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
+                                                                     // minutes
 
   private final NameNodeConnector nnc;
   private final SaslDataTransferClient saslClient;
@@ -276,6 +278,13 @@ public class Dispatcher {
         sock.connect(
             NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
+        /*
+         * Unfortunately we don't have a good way to know if the Datanode is
+         * taking a really long time to move a block, OR something has gone
+         * wrong and it's never going to finish. To deal with this scenario, we
+         * set a long timeout (20 minutes) to avoid hanging indefinitely.
+         */
+        sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
 
         sock.setKeepAlive(true);
 
@@ -332,12 +341,8 @@ public class Dispatcher {
 
     /** Receive a block copy response from the input stream */
     private void receiveResponse(DataInputStream in) throws IOException {
-      BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(vintPrefixed(in));
-      while (response.getStatus() == Status.IN_PROGRESS) {
-        // read intermediate responses
-        response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
-      }
+      BlockOpResponseProto response = BlockOpResponseProto
+          .parseFrom(vintPrefixed(in));
       if (response.getStatus() != Status.SUCCESS) {
         if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
           throw new IOException("block move failed due to access token error");

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1617794&r1=1617793&r2=1617794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 13 18:36:51 2014
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -124,14 +123,6 @@ class BlockReceiver implements Closeable
   private boolean syncOnClose;
   private long restartBudget;
 
-  /**
-   * for replaceBlock response
-   */
-  private final long responseInterval;
-  private long lastResponseTime = 0;
-  private boolean isReplaceBlock = false;
-  private DataOutputStream replyOut = null;
-
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
       final DataInputStream in,
       final String inAddr, final String myAddr,
@@ -153,9 +144,6 @@ class BlockReceiver implements Closeable
       this.isClient = !this.isDatanode;
       this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
       this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
-      // For replaceBlock() calls response should be sent to avoid socketTimeout
-      // at clients. So sending with the interval of 0.5 * socketTimeout
-      this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
       //for datanode, we have
       //1: clientName.length() == 0, and
       //2: stage == null or PIPELINE_SETUP_CREATE
@@ -663,20 +651,6 @@ class BlockReceiver implements Closeable
           lastPacketInBlock, offsetInBlock, Status.SUCCESS);
     }
 
-    /*
-     * Send in-progress responses for the replaceBlock() calls back to caller to
-     * avoid timeouts due to balancer throttling. HDFS-6247
-     */
-    if (isReplaceBlock
-        && (Time.monotonicNow() - lastResponseTime > responseInterval)) {
-      BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
-          .setStatus(Status.IN_PROGRESS);
-      response.build().writeDelimitedTo(replyOut);
-      replyOut.flush();
-
-      lastResponseTime = Time.monotonicNow();
-    }
-
     if (throttler != null) { // throttle I/O
       throttler.throttle(len);
     }
@@ -744,8 +718,7 @@ class BlockReceiver implements Closeable
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      DatanodeInfo[] downstreams,
-      boolean isReplaceBlock) throws IOException {
+      DatanodeInfo[] downstreams) throws IOException {
 
       syncOnClose = datanode.getDnConf().syncOnClose;
       boolean responderClosed = false;
@@ -753,9 +726,6 @@ class BlockReceiver implements Closeable
       mirrorAddr = mirrAddr;
       throttler = throttlerArg;
 
-      this.replyOut = replyOut;
-      this.isReplaceBlock = isReplaceBlock;
-
     try {
       if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1617794&r1=1617793&r2=1617794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 13 18:36:51 2014
@@ -708,7 +708,7 @@ class DataXceiver extends Receiver imple
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets, false);
+            mirrorAddr, null, targets);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -983,7 +983,7 @@ class DataXceiver extends Receiver imple
     String errMsg = null;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
-    DataOutputStream replyOut = new DataOutputStream(getOutputStream());
+    
     try {
       // get the output stream to the proxy
       final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver imple
           CachingStrategy.newDropBehind());
 
       // receive a block
-      blockReceiver.receiveBlock(null, null, replyOut, null, 
-          dataXceiverServer.balanceThrottler, null, true);
+      blockReceiver.receiveBlock(null, null, null, null, 
+          dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(
@@ -1076,7 +1076,6 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
-      IOUtils.closeStream(replyOut);
     }
 
     //update metrics

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1617794&r1=1617793&r2=1617794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Wed Aug 13 18:36:51 2014
@@ -207,7 +207,6 @@ enum Status {
   OOB_RESERVED1 = 9;          // Reserved
   OOB_RESERVED2 = 10;         // Reserved
   OOB_RESERVED3 = 11;         // Reserved
-  IN_PROGRESS = 12;
 }
 
 message PipelineAckProto {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1617794&r1=1617793&r2=1617794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed Aug 13 18:36:51 2014
@@ -272,10 +272,8 @@ public class TestBlockReplacement {
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());
 
-    BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
-    while (proto.getStatus() == Status.IN_PROGRESS) {
-      proto = BlockOpResponseProto.parseDelimitedFrom(reply);
-    }
+    BlockOpResponseProto proto =
+      BlockOpResponseProto.parseDelimitedFrom(reply);
     return proto.getStatus() == Status.SUCCESS;
   }