You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2014/08/13 20:06:33 UTC
svn commit: r1617784 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/balancer/
src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/proto/
src/test/java/org/apache/hadoop/hdfs/server/d...
Author: vinayakumarb
Date: Wed Aug 13 18:06:33 2014
New Revision: 1617784
URL: http://svn.apache.org/r1617784
Log:
HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (Contributed by Vinayakumar B.)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617784&r1=1617783&r2=1617784&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 13 18:06:33 2014
@@ -500,6 +500,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
block replica (Arpit Agarwal)
+ HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate
+ responses to Balancer (vinayakumarb)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617784&r1=1617783&r2=1617784&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Wed Aug 13 18:06:33 2014
@@ -87,8 +87,6 @@ public class Dispatcher {
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
- private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
- // minutes
private final NameNodeConnector nnc;
private final SaslDataTransferClient saslClient;
@@ -278,13 +276,6 @@ public class Dispatcher {
sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
- /*
- * Unfortunately we don't have a good way to know if the Datanode is
- * taking a really long time to move a block, OR something has gone
- * wrong and it's never going to finish. To deal with this scenario, we
- * set a long timeout (20 minutes) to avoid hanging indefinitely.
- */
- sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
sock.setKeepAlive(true);
@@ -341,8 +332,12 @@ public class Dispatcher {
/** Receive a block copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
- BlockOpResponseProto response = BlockOpResponseProto
- .parseFrom(vintPrefixed(in));
+ BlockOpResponseProto response =
+ BlockOpResponseProto.parseFrom(vintPrefixed(in));
+ while (response.getStatus() == Status.IN_PROGRESS) {
+ // read intermediate responses
+ response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+ }
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new IOException("block move failed due to access token error");
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1617784&r1=1617783&r2=1617784&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 13 18:06:33 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -123,6 +124,14 @@ class BlockReceiver implements Closeable
private boolean syncOnClose;
private long restartBudget;
+ /**
+ * for replaceBlock response
+ */
+ private final long responseInterval;
+ private long lastResponseTime = 0;
+ private boolean isReplaceBlock = false;
+ private DataOutputStream replyOut = null;
+
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
@@ -144,6 +153,9 @@ class BlockReceiver implements Closeable
this.isClient = !this.isDatanode;
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
+ // For replaceBlock() calls response should be sent to avoid socketTimeout
+ // at clients. So sending with the interval of 0.5 * socketTimeout
+ this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
//for datanode, we have
//1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE
@@ -651,6 +663,20 @@ class BlockReceiver implements Closeable
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
+ /*
+ * Send in-progress responses for the replaceBlock() calls back to caller to
+ * avoid timeouts due to balancer throttling. HDFS-6247
+ */
+ if (isReplaceBlock
+ && (Time.monotonicNow() - lastResponseTime > responseInterval)) {
+ BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+ .setStatus(Status.IN_PROGRESS);
+ response.build().writeDelimitedTo(replyOut);
+ replyOut.flush();
+
+ lastResponseTime = Time.monotonicNow();
+ }
+
if (throttler != null) { // throttle I/O
throttler.throttle(len);
}
@@ -718,7 +744,8 @@ class BlockReceiver implements Closeable
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
- DatanodeInfo[] downstreams) throws IOException {
+ DatanodeInfo[] downstreams,
+ boolean isReplaceBlock) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
@@ -726,6 +753,9 @@ class BlockReceiver implements Closeable
mirrorAddr = mirrAddr;
throttler = throttlerArg;
+ this.replyOut = replyOut;
+ this.isReplaceBlock = isReplaceBlock;
+
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1617784&r1=1617783&r2=1617784&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 13 18:06:33 2014
@@ -708,7 +708,7 @@ class DataXceiver extends Receiver imple
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets);
+ mirrorAddr, null, targets, false);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@@ -983,7 +983,7 @@ class DataXceiver extends Receiver imple
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
-
+ DataOutputStream replyOut = new DataOutputStream(getOutputStream());
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver imple
CachingStrategy.newDropBehind());
// receive a block
- blockReceiver.receiveBlock(null, null, null, null,
- dataXceiverServer.balanceThrottler, null);
+ blockReceiver.receiveBlock(null, null, replyOut, null,
+ dataXceiverServer.balanceThrottler, null, true);
// notify name node
datanode.notifyNamenodeReceivedBlock(
@@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
+ IOUtils.closeStream(replyOut);
}
//update metrics
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1617784&r1=1617783&r2=1617784&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Wed Aug 13 18:06:33 2014
@@ -207,6 +207,7 @@ enum Status {
OOB_RESERVED1 = 9; // Reserved
OOB_RESERVED2 = 10; // Reserved
OOB_RESERVED3 = 11; // Reserved
+ IN_PROGRESS = 12;
}
message PipelineAckProto {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1617784&r1=1617783&r2=1617784&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed Aug 13 18:06:33 2014
@@ -272,8 +272,10 @@ public class TestBlockReplacement {
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
- BlockOpResponseProto proto =
- BlockOpResponseProto.parseDelimitedFrom(reply);
+ BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+ while (proto.getStatus() == Status.IN_PROGRESS) {
+ proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+ }
return proto.getStatus() == Status.SUCCESS;
}