You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2014/08/13 20:40:28 UTC
svn commit: r1617797 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/balancer/
src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/proto/
src/test/java/org/apache/hadoop/h...
Author: vinayakumarb
Date: Wed Aug 13 18:40:28 2014
New Revision: 1617797
URL: http://svn.apache.org/r1617797
Log:
Reverted
Merged revision(s) 1617794 from hadoop/common/trunk:
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617797&r1=1617796&r2=1617797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 13 18:40:28 2014
@@ -245,9 +245,6 @@ Release 2.6.0 - UNRELEASED
HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
block replica (Arpit Agarwal)
- HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate
- responses to Balancer (vinayakumarb)
-
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617797&r1=1617796&r2=1617797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Wed Aug 13 18:40:28 2014
@@ -87,6 +87,8 @@ public class Dispatcher {
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
+ private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
+ // minutes
private final NameNodeConnector nnc;
private final SaslDataTransferClient saslClient;
@@ -276,6 +278,13 @@ public class Dispatcher {
sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
+ /*
+ * Unfortunately we don't have a good way to know if the Datanode is
+ * taking a really long time to move a block, OR something has gone
+ * wrong and it's never going to finish. To deal with this scenario, we
+ * set a long timeout (20 minutes) to avoid hanging indefinitely.
+ */
+ sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
sock.setKeepAlive(true);
@@ -332,12 +341,8 @@ public class Dispatcher {
/** Receive a block copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
- BlockOpResponseProto response =
- BlockOpResponseProto.parseFrom(vintPrefixed(in));
- while (response.getStatus() == Status.IN_PROGRESS) {
- // read intermediate responses
- response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
- }
+ BlockOpResponseProto response = BlockOpResponseProto
+ .parseFrom(vintPrefixed(in));
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new IOException("block move failed due to access token error");
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1617797&r1=1617796&r2=1617797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 13 18:40:28 2014
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -124,14 +123,6 @@ class BlockReceiver implements Closeable
private boolean syncOnClose;
private long restartBudget;
- /**
- * for replaceBlock response
- */
- private final long responseInterval;
- private long lastResponseTime = 0;
- private boolean isReplaceBlock = false;
- private DataOutputStream replyOut = null;
-
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
@@ -153,9 +144,6 @@ class BlockReceiver implements Closeable
this.isClient = !this.isDatanode;
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
- // For replaceBlock() calls response should be sent to avoid socketTimeout
- // at clients. So sending with the interval of 0.5 * socketTimeout
- this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
//for datanode, we have
//1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE
@@ -663,20 +651,6 @@ class BlockReceiver implements Closeable
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
- /*
- * Send in-progress responses for the replaceBlock() calls back to caller to
- * avoid timeouts due to balancer throttling. HDFS-6247
- */
- if (isReplaceBlock
- && (Time.monotonicNow() - lastResponseTime > responseInterval)) {
- BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
- .setStatus(Status.IN_PROGRESS);
- response.build().writeDelimitedTo(replyOut);
- replyOut.flush();
-
- lastResponseTime = Time.monotonicNow();
- }
-
if (throttler != null) { // throttle I/O
throttler.throttle(len);
}
@@ -744,8 +718,7 @@ class BlockReceiver implements Closeable
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
- DatanodeInfo[] downstreams,
- boolean isReplaceBlock) throws IOException {
+ DatanodeInfo[] downstreams) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
@@ -753,9 +726,6 @@ class BlockReceiver implements Closeable
mirrorAddr = mirrAddr;
throttler = throttlerArg;
- this.replyOut = replyOut;
- this.isReplaceBlock = isReplaceBlock;
-
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1617797&r1=1617796&r2=1617797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 13 18:40:28 2014
@@ -708,7 +708,7 @@ class DataXceiver extends Receiver imple
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets, false);
+ mirrorAddr, null, targets);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@@ -983,7 +983,7 @@ class DataXceiver extends Receiver imple
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
- DataOutputStream replyOut = new DataOutputStream(getOutputStream());
+
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver imple
CachingStrategy.newDropBehind());
// receive a block
- blockReceiver.receiveBlock(null, null, replyOut, null,
- dataXceiverServer.balanceThrottler, null, true);
+ blockReceiver.receiveBlock(null, null, null, null,
+ dataXceiverServer.balanceThrottler, null);
// notify name node
datanode.notifyNamenodeReceivedBlock(
@@ -1076,7 +1076,6 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
- IOUtils.closeStream(replyOut);
}
//update metrics
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1617797&r1=1617796&r2=1617797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Wed Aug 13 18:40:28 2014
@@ -207,7 +207,6 @@ enum Status {
OOB_RESERVED1 = 9; // Reserved
OOB_RESERVED2 = 10; // Reserved
OOB_RESERVED3 = 11; // Reserved
- IN_PROGRESS = 12;
}
message PipelineAckProto {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1617797&r1=1617796&r2=1617797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed Aug 13 18:40:28 2014
@@ -272,10 +272,8 @@ public class TestBlockReplacement {
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
- BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
- while (proto.getStatus() == Status.IN_PROGRESS) {
- proto = BlockOpResponseProto.parseDelimitedFrom(reply);
- }
+ BlockOpResponseProto proto =
+ BlockOpResponseProto.parseDelimitedFrom(reply);
return proto.getStatus() == Status.SUCCESS;
}