You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/01/08 18:15:28 UTC
hadoop git commit: HDFS-9574. Reduce client failures during datanode
restart. Contributed by Kihwal Lee. (cherry picked from commit
38c4c14472996562eb3d610649246770c2888c6b)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 20bb5c403 -> 8d04b7c27
HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee.
(cherry picked from commit 38c4c14472996562eb3d610649246770c2888c6b)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d04b7c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d04b7c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d04b7c2
Branch: refs/heads/branch-2
Commit: 8d04b7c272c1c1ecc536a14569ce22022b5a05e1
Parents: 20bb5c4
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Jan 8 11:15:12 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Jan 8 11:15:12 2016 -0600
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSInputStream.java | 59 ++++++++--
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../hadoop/hdfs/server/datanode/DNConf.java | 12 ++
.../hadoop/hdfs/server/datanode/DataNode.java | 11 +-
.../server/datanode/DataNodeFaultInjector.java | 2 +
.../hdfs/server/datanode/DataXceiver.java | 109 +++++++++++--------
.../src/main/resources/hdfs-default.xml | 10 ++
.../TestDataXceiverLazyPersistHint.java | 6 +-
.../fsdataset/impl/TestDatanodeRestart.java | 72 ++++++++++++
10 files changed, 224 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 92a33eb..f4dad12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -29,6 +29,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -73,10 +74,12 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
+import org.apache.hadoop.util.StopWatch;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
@@ -358,12 +361,18 @@ public class DFSInputStream extends FSInputStream
int replicaNotFoundCount = locatedblock.getLocations().length;
final DfsClientConf conf = dfsClient.getConf();
- for(DatanodeInfo datanode : locatedblock.getLocations()) {
+ final int timeout = conf.getSocketTimeout();
+ LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>(
+ Arrays.asList(locatedblock.getLocations()));
+ LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>();
+ boolean isRetry = false;
+ StopWatch sw = new StopWatch();
+ while (nodeList.size() > 0) {
+ DatanodeInfo datanode = nodeList.pop();
ClientDatanodeProtocol cdp = null;
-
try {
cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
- dfsClient.getConfiguration(), conf.getSocketTimeout(),
+ dfsClient.getConfiguration(), timeout,
conf.isConnectToDnViaHostname(), locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -371,15 +380,19 @@ public class DFSInputStream extends FSInputStream
if (n >= 0) {
return n;
}
- }
- catch(IOException ioe) {
- if (ioe instanceof RemoteException &&
- (((RemoteException) ioe).unwrapRemoteException() instanceof
- ReplicaNotFoundException)) {
- // special case : replica might not be on the DN, treat as 0 length
- replicaNotFoundCount--;
+ } catch (IOException ioe) {
+ if (ioe instanceof RemoteException) {
+ if (((RemoteException) ioe).unwrapRemoteException() instanceof
+ ReplicaNotFoundException) {
+ // replica is not on the DN. We will treat it as 0 length
+ // if no one actually has a replica.
+ replicaNotFoundCount--;
+ } else if (((RemoteException) ioe).unwrapRemoteException() instanceof
+ RetriableException) {
+ // add to the list to be retried if necessary.
+ retryList.add(datanode);
+ }
}
-
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
+ " for block {}", datanode, locatedblock.getBlock(), ioe);
} finally {
@@ -387,6 +400,30 @@ public class DFSInputStream extends FSInputStream
RPC.stopProxy(cdp);
}
}
+
+ // Ran out of nodes, but there are retriable nodes.
+ if (nodeList.size() == 0 && retryList.size() > 0) {
+ nodeList.addAll(retryList);
+ retryList.clear();
+ isRetry = true;
+ }
+
+ if (isRetry) {
+ // start the stop watch if not already running.
+ if (!sw.isRunning()) {
+ sw.start();
+ }
+ try {
+ Thread.sleep(500); // delay between retries.
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while getting the length.");
+ }
+ }
+
+ // see if we ran out of retry time
+ if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) {
+ break;
+ }
}
// Namenode told us about these locations, but none know about the replica
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6bca0bf..e1434c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1698,6 +1698,8 @@ Release 2.7.3 - UNRELEASED
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
retry policy. (Eric Payne via kihwal)
+ HDFS-9574. Reduce client failures during datanode restart (kihwal)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e73aa2d..7450730 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -492,6 +492,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta";
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
+ public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout";
+ public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20;
public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index bd4943d..fe80efe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -104,6 +106,8 @@ public class DNConf {
final long maxLockedMemory;
+ private final long bpReadyTimeout;
+
// Allow LAZY_PERSIST writes from non-local clients?
private final boolean allowNonLocalLazyPersist;
@@ -210,6 +214,10 @@ public class DNConf {
this.allowNonLocalLazyPersist = conf.getBoolean(
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
+
+ this.bpReadyTimeout = conf.getLong(
+ DFS_DATANODE_BP_READY_TIMEOUT_KEY,
+ DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -295,4 +303,8 @@ public class DNConf {
public int getTransferSocketSendBufferSize() {
return transferSocketSendBufferSize;
}
+
+ public long getBpReadyTimeout() {
+ return bpReadyTimeout;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b247500..9e97b47 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1594,6 +1594,7 @@ public class DataNode extends ReconfigurableBase
@VisibleForTesting
public DatanodeRegistration getDNRegistrationForBP(String bpid)
throws IOException {
+ DataNodeFaultInjector.get().noRegistration();
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null || bpos.bpRegistration==null) {
throw new IOException("cannot find BPOfferService for bpid="+bpid);
@@ -1721,7 +1722,6 @@ public class DataNode extends ReconfigurableBase
throw new ShortCircuitFdsUnsupportedException(
fileDescriptorPassingDisabledReason);
}
- checkBlockToken(blk, token, BlockTokenIdentifier.AccessMode.READ);
int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
if (maxVersion < blkVersion) {
throw new ShortCircuitFdsVersionException("Your client is too old " +
@@ -2707,6 +2707,15 @@ public class DataNode extends ReconfigurableBase
}
private void checkReadAccess(final ExtendedBlock block) throws IOException {
+ // Make sure this node has registered for the block pool.
+ try {
+ getDNRegistrationForBP(block.getBlockPoolId());
+ } catch (IOException e) {
+ // if it has not registered with the NN, throw an exception back.
+ throw new org.apache.hadoop.ipc.RetriableException(
+ "Datanode not registered. Try again later.");
+ }
+
if (isBlockTokenEnabled) {
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
.getTokenIdentifiers();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 46ec3ae..0e38694 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -49,4 +49,6 @@ public class DataNodeFaultInjector {
public boolean dropHeartbeatPacket() {
return false;
}
+
+ public void noRegistration() throws IOException { }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a54214f..83ebb50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -45,6 +45,7 @@ import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
@@ -85,6 +86,7 @@ import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StopWatch;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@@ -298,6 +300,9 @@ class DataXceiver extends Receiver implements Runnable {
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk);
+ DataOutputStream out = getBufferedOutputStream();
+ checkAccess(out, true, blk, token,
+ Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null;
SlotId registeredSlotId = null;
@@ -326,9 +331,6 @@ class DataXceiver extends Receiver implements Runnable {
} catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage());
- } catch (InvalidToken e) {
- bld.setStatus(ERROR_ACCESS_TOKEN);
- bld.setMessage(e.getMessage());
} catch (IOException e) {
bld.setStatus(ERROR);
bld.setMessage(e.getMessage());
@@ -516,9 +518,9 @@ class DataXceiver extends Receiver implements Runnable {
final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
long read = 0;
+ updateCurrentThreadName("Sending block " + block);
OutputStream baseStream = getOutputStream();
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- baseStream, smallBufferSize));
+ DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
@@ -534,7 +536,6 @@ class DataXceiver extends Receiver implements Runnable {
: dnR + " Served block " + block + " to " +
remoteAddress;
- updateCurrentThreadName("Sending block " + block);
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
@@ -630,6 +631,10 @@ class DataXceiver extends Receiver implements Runnable {
allowLazyPersist = allowLazyPersist &&
(dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
long size = 0;
+ // reply to upstream datanode or client
+ final DataOutputStream replyOut = getBufferedOutputStream();
+ checkAccess(replyOut, isClient, block, blockToken,
+ Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets "
@@ -660,11 +665,6 @@ class DataXceiver extends Receiver implements Runnable {
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
+ localAddress);
- // reply to upstream datanode or client
- final DataOutputStream replyOut = getBufferedOutputStream();
- checkAccess(replyOut, isClient, block, blockToken,
- Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
-
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
@@ -863,13 +863,13 @@ class DataXceiver extends Receiver implements Runnable {
final String clientName,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException {
- checkAccess(socketOut, true, blk, blockToken,
- Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
getOutputStream());
+ checkAccess(out, true, blk, blockToken,
+ Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
try {
datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName);
@@ -923,6 +923,7 @@ class DataXceiver extends Receiver implements Runnable {
@Override
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ updateCurrentThreadName("Getting checksum for block " + block);
final DataOutputStream out = new DataOutputStream(
getOutputStream());
checkAccess(out, true, block, blockToken,
@@ -933,13 +934,11 @@ class DataXceiver extends Receiver implements Runnable {
long visibleLength = datanode.data.getReplicaVisibleLength(block);
boolean partialBlk = requestLength < visibleLength;
- updateCurrentThreadName("Reading metadata for block " + block);
final LengthInputStream metadataIn = datanode.data
.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, ioFileBufferSize));
- updateCurrentThreadName("Getting checksum for block " + block);
try {
//read metadata file
final BlockMetadataHeader header = BlockMetadataHeader
@@ -987,21 +986,10 @@ class DataXceiver extends Receiver implements Runnable {
public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block);
- // Read in the header
- if (datanode.isBlockTokenEnabled) {
- try {
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
- BlockTokenIdentifier.AccessMode.COPY);
- } catch (InvalidToken e) {
- LOG.warn("Invalid access token in request from " + remoteAddress
- + " for OP_COPY_BLOCK for block " + block + " : "
- + e.getLocalizedMessage());
- sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
- return;
- }
+ DataOutputStream reply = getBufferedOutputStream();
+ checkAccess(reply, true, block, blockToken,
+ Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
- }
-
if (datanode.data.getPinning(block)) {
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because it's pinned ";
@@ -1019,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
}
BlockSender blockSender = null;
- DataOutputStream reply = null;
boolean isOpSuccess = true;
try {
@@ -1027,10 +1014,7 @@ class DataXceiver extends Receiver implements Runnable {
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
null, CachingStrategy.newDropBehind());
- // set up response stream
OutputStream baseStream = getOutputStream();
- reply = new DataOutputStream(new BufferedOutputStream(
- baseStream, smallBufferSize));
// send status first
writeSuccessWithChecksumInfo(blockSender, reply);
@@ -1074,20 +1058,9 @@ class DataXceiver extends Receiver implements Runnable {
final String delHint,
final DatanodeInfo proxySource) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
-
- /* read header */
- if (datanode.isBlockTokenEnabled) {
- try {
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
- BlockTokenIdentifier.AccessMode.REPLACE);
- } catch (InvalidToken e) {
- LOG.warn("Invalid access token in request from " + remoteAddress
- + " for OP_REPLACE_BLOCK for block " + block + " : "
- + e.getLocalizedMessage());
- sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
- return;
- }
- }
+ DataOutputStream replyOut = new DataOutputStream(getOutputStream());
+ checkAccess(replyOut, true, block, blockToken,
+ Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE);
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() +
@@ -1104,7 +1077,6 @@ class DataXceiver extends Receiver implements Runnable {
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
- DataOutputStream replyOut = new DataOutputStream(getOutputStream());
boolean IoeDuringCopyBlockOperation = false;
try {
// Move the block to different storage in the same datanode
@@ -1296,11 +1268,52 @@ class DataXceiver extends Receiver implements Runnable {
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
}
+ /**
+ * Wait until the BP is registered, upto the configured amount of time.
+ * Throws an exception if times out, which should fail the client request.
+ * @param the requested block
+ */
+ void checkAndWaitForBP(final ExtendedBlock block)
+ throws IOException {
+ String bpId = block.getBlockPoolId();
+
+ // The registration is only missing in relatively short time window.
+ // Optimistically perform this first.
+ try {
+ datanode.getDNRegistrationForBP(bpId);
+ return;
+ } catch (IOException ioe) {
+ // not registered
+ }
+
+ // retry
+ long bpReadyTimeout = dnConf.getBpReadyTimeout();
+ StopWatch sw = new StopWatch();
+ sw.start();
+ while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) {
+ try {
+ datanode.getDNRegistrationForBP(bpId);
+ return;
+ } catch (IOException ioe) {
+ // not registered
+ }
+ // sleep before trying again
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted while serving request. Aborting.");
+ }
+ }
+ // failed to obtain registration.
+ throw new IOException("Not ready to serve the block pool, " + bpId + ".");
+ }
+
private void checkAccess(OutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final Op op,
final BlockTokenIdentifier.AccessMode mode) throws IOException {
+ checkAndWaitForBP(blk);
if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking block access token for block '" + blk.getBlockId()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ff38fcb..4c56326 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2704,4 +2704,14 @@
</description>
</property>
+<property>
+ <name>dfs.datanode.bp-ready.timeout</name>
+ <value>20</value>
+ <description>
+ The maximum wait time for datanode to be ready before failing the
+ received request. Setting this to 0 fails requests right away if the
+ datanode is not yet registered with the namenode. This wait time
+ reduces initial request failures after datanode restart.
+ </description>
+</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index d8a7188..3af959c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.net.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DataChecksum;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@@ -162,17 +163,20 @@ public class TestDataXceiverLazyPersistHint {
return peer;
}
- private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) {
+ private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist)
+ throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
DNConf dnConf = new DNConf(conf);
+ DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DataNode mockDn = mock(DataNode.class);
when(mockDn.getDnConf()).thenReturn(dnConf);
when(mockDn.getConf()).thenReturn(conf);
when(mockDn.getMetrics()).thenReturn(mockMetrics);
+ when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg);
return mockDn;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d04b7c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
index 8bbac9f..40a3d9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -146,4 +147,75 @@ public class TestDatanodeRestart {
private static FsDatasetImpl dataset(DataNode dn) {
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
}
+
+ @Test
+ public void testWaitForRegistrationOnRestart() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5);
+ conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+
+ // This makes the datanode appear registered to the NN, but it won't be
+ // able to get to the saved dn reg internally.
+ DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
+ @Override
+ public void noRegistration() throws IOException {
+ throw new IOException("no reg found for testing");
+ }
+ };
+ DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
+ DataNodeFaultInjector.set(dnFaultInjector);
+ MiniDFSCluster cluster = null;
+ long start = 0;
+ Path file = new Path("/reg");
+ try {
+ int numDNs = 1;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.waitActive();
+
+ start = System.currentTimeMillis();
+ FileSystem fileSys = cluster.getFileSystem();
+ try {
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
+ // It is a bug if this does not fail.
+ throw new IOException("Did not fail!");
+ } catch (org.apache.hadoop.ipc.RemoteException e) {
+ long elapsed = System.currentTimeMillis() - start;
+ // timers have at-least semantics, so it should be at least 5 seconds.
+ if (elapsed < 5000 || elapsed > 10000) {
+ throw new IOException(elapsed + " seconds passed.", e);
+ }
+ }
+ DataNodeFaultInjector.set(oldDnInjector);
+ // this should succeed now.
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
+
+ // turn it back to under-construction, so that the client calls
+ // getReplicaVisibleLength() rpc method against the datanode.
+ fileSys.append(file);
+ // back to simulating unregistered node.
+ DataNodeFaultInjector.set(dnFaultInjector);
+ byte[] buffer = new byte[8];
+ start = System.currentTimeMillis();
+ try {
+ fileSys.open(file).read(0L, buffer, 0, 1);
+ throw new IOException("Did not fail!");
+ } catch (IOException e) {
+ long elapsed = System.currentTimeMillis() - start;
+ if (e.getMessage().contains("readBlockLength")) {
+ throw new IOException("Failed, but with unexpected exception:", e);
+ }
+ // timers have at-least semantics, so it should be at least 5 seconds.
+ if (elapsed < 5000 || elapsed > 10000) {
+ throw new IOException(elapsed + " seconds passed.", e);
+ }
+ }
+ DataNodeFaultInjector.set(oldDnInjector);
+ fileSys.open(file).read(0L, buffer, 0, 1);
+ } finally {
+ DataNodeFaultInjector.set(oldDnInjector);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}