You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/01/12 05:32:01 UTC

[02/23] hadoop git commit: HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee.

HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/38c4c144
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/38c4c144
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/38c4c144

Branch: refs/heads/HDFS-1312
Commit: 38c4c14472996562eb3d610649246770c2888c6b
Parents: ed18527
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Jan 8 11:13:25 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Jan 8 11:13:58 2016 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  59 ++++++++--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../hadoop/hdfs/server/datanode/DNConf.java     |  12 ++
 .../hadoop/hdfs/server/datanode/DataNode.java   |  11 +-
 .../server/datanode/DataNodeFaultInjector.java  |   2 +
 .../hdfs/server/datanode/DataXceiver.java       | 109 +++++++++++--------
 .../src/main/resources/hdfs-default.xml         |  10 ++
 .../TestDataXceiverLazyPersistHint.java         |   6 +-
 .../fsdataset/impl/TestDatanodeRestart.java     |  72 ++++++++++++
 10 files changed, 224 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6823c1f..b6b11ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -29,6 +29,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -72,10 +73,12 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
+import org.apache.hadoop.util.StopWatch;
 import org.apache.htrace.core.SpanId;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
@@ -357,12 +360,18 @@ public class DFSInputStream extends FSInputStream
     int replicaNotFoundCount = locatedblock.getLocations().length;
 
     final DfsClientConf conf = dfsClient.getConf();
-    for(DatanodeInfo datanode : locatedblock.getLocations()) {
+    final int timeout = conf.getSocketTimeout();
+    LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>(
+        Arrays.asList(locatedblock.getLocations()));
+    LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>();
+    boolean isRetry = false;
+    StopWatch sw = new StopWatch();
+    while (nodeList.size() > 0) {
+      DatanodeInfo datanode = nodeList.pop();
       ClientDatanodeProtocol cdp = null;
-
       try {
         cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
-            dfsClient.getConfiguration(), conf.getSocketTimeout(),
+            dfsClient.getConfiguration(), timeout,
             conf.isConnectToDnViaHostname(), locatedblock);
 
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -370,15 +379,19 @@ public class DFSInputStream extends FSInputStream
         if (n >= 0) {
           return n;
         }
-      }
-      catch(IOException ioe) {
-        if (ioe instanceof RemoteException &&
-            (((RemoteException) ioe).unwrapRemoteException() instanceof
-                ReplicaNotFoundException)) {
-          // special case : replica might not be on the DN, treat as 0 length
-          replicaNotFoundCount--;
+      } catch (IOException ioe) {
+        if (ioe instanceof RemoteException) {
+          if (((RemoteException) ioe).unwrapRemoteException() instanceof
+              ReplicaNotFoundException) {
+            // replica is not on the DN. We will treat it as 0 length
+            // if no one actually has a replica.
+            replicaNotFoundCount--;
+          } else if (((RemoteException) ioe).unwrapRemoteException() instanceof
+              RetriableException) {
+            // add to the list to be retried if necessary.
+            retryList.add(datanode);
+          }
         }
-
         DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
               + " for block {}", datanode, locatedblock.getBlock(), ioe);
       } finally {
@@ -386,6 +399,30 @@ public class DFSInputStream extends FSInputStream
           RPC.stopProxy(cdp);
         }
       }
+
+      // Ran out of nodes, but there are retriable nodes.
+      if (nodeList.size() == 0 && retryList.size() > 0) {
+        nodeList.addAll(retryList);
+        retryList.clear();
+        isRetry = true;
+      }
+
+      if (isRetry) {
+        // start the stop watch if not already running.
+        if (!sw.isRunning()) {
+          sw.start();
+        }
+        try {
+          Thread.sleep(500); // delay between retries.
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted while getting the length.");
+        }
+      }
+
+      // see if we ran out of retry time
+      if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) {
+        break;
+      }
     }
 
     // Namenode told us about these locations, but none know about the replica

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c451ea6..887ddef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2581,6 +2581,8 @@ Release 2.7.3 - UNRELEASED
     HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
     retry policy. (Eric Payne via kihwal)
 
+    HDFS-9574. Reduce client failures during datanode restart (kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8707065..22859ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -507,6 +507,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
   public static final String  DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
+  public static final String  DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout";
+  public static final long    DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20;
 
   public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
   public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index b3cb48b..0f84fc5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -104,6 +106,8 @@ public class DNConf {
 
   final long maxLockedMemory;
 
+  private final long bpReadyTimeout;
+
   // Allow LAZY_PERSIST writes from non-local clients?
   private final boolean allowNonLocalLazyPersist;
 
@@ -210,6 +214,10 @@ public class DNConf {
     this.allowNonLocalLazyPersist = conf.getBoolean(
         DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
         DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
+
+    this.bpReadyTimeout = conf.getLong(
+        DFS_DATANODE_BP_READY_TIMEOUT_KEY,
+        DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT);
   }
 
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -322,4 +330,8 @@ public class DNConf {
   public int getTransferSocketSendBufferSize() {
     return transferSocketSendBufferSize;
   }
+
+  public long getBpReadyTimeout() {
+    return bpReadyTimeout;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 241f1e5..6cd47ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1594,6 +1594,7 @@ public class DataNode extends ReconfigurableBase
   @VisibleForTesting
   public DatanodeRegistration getDNRegistrationForBP(String bpid) 
   throws IOException {
+    DataNodeFaultInjector.get().noRegistration();
     BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos==null || bpos.bpRegistration==null) {
       throw new IOException("cannot find BPOfferService for bpid="+bpid);
@@ -1721,7 +1722,6 @@ public class DataNode extends ReconfigurableBase
       throw new ShortCircuitFdsUnsupportedException(
           fileDescriptorPassingDisabledReason);
     }
-    checkBlockToken(blk, token, BlockTokenIdentifier.AccessMode.READ);
     int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
     if (maxVersion < blkVersion) {
       throw new ShortCircuitFdsVersionException("Your client is too old " +
@@ -2709,6 +2709,15 @@ public class DataNode extends ReconfigurableBase
   }
 
   private void checkReadAccess(final ExtendedBlock block) throws IOException {
+    // Make sure this node has registered for the block pool.
+    try {
+      getDNRegistrationForBP(block.getBlockPoolId());
+    } catch (IOException e) {
+      // if it has not registered with the NN, throw an exception back.
+      throw new org.apache.hadoop.ipc.RetriableException(
+          "Datanode not registered. Try again later.");
+    }
+
     if (isBlockTokenEnabled) {
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 46ec3ae..0e38694 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -49,4 +49,6 @@ public class DataNodeFaultInjector {
   public boolean dropHeartbeatPacket() {
     return false;
   }
+
+  public void noRegistration() throws IOException { }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 94ce636..190e69c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -45,6 +45,7 @@ import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
 import java.security.MessageDigest;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -85,6 +86,7 @@ import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StopWatch;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
@@ -298,6 +300,9 @@ class DataXceiver extends Receiver implements Runnable {
       SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
         throws IOException {
     updateCurrentThreadName("Passing file descriptors for block " + blk);
+    DataOutputStream out = getBufferedOutputStream();
+    checkAccess(out, true, blk, token,
+        Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
     BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
     FileInputStream fis[] = null;
     SlotId registeredSlotId = null;
@@ -326,9 +331,6 @@ class DataXceiver extends Receiver implements Runnable {
       } catch (ShortCircuitFdsUnsupportedException e) {
         bld.setStatus(ERROR_UNSUPPORTED);
         bld.setMessage(e.getMessage());
-      } catch (InvalidToken e) {
-        bld.setStatus(ERROR_ACCESS_TOKEN);
-        bld.setMessage(e.getMessage());
       } catch (IOException e) {
         bld.setStatus(ERROR);
         bld.setMessage(e.getMessage());
@@ -516,9 +518,9 @@ class DataXceiver extends Receiver implements Runnable {
       final CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientName;
     long read = 0;
+    updateCurrentThreadName("Sending block " + block);
     OutputStream baseStream = getOutputStream();
-    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        baseStream, smallBufferSize));
+    DataOutputStream out = getBufferedOutputStream();
     checkAccess(out, true, block, blockToken,
         Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
   
@@ -534,7 +536,6 @@ class DataXceiver extends Receiver implements Runnable {
         : dnR + " Served block " + block + " to " +
             remoteAddress;
 
-    updateCurrentThreadName("Sending block " + block);
     try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
@@ -630,6 +631,10 @@ class DataXceiver extends Receiver implements Runnable {
     allowLazyPersist = allowLazyPersist &&
         (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
     long size = 0;
+    // reply to upstream datanode or client 
+    final DataOutputStream replyOut = getBufferedOutputStream();
+    checkAccess(replyOut, isClient, block, blockToken,
+        Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
     // check single target for transfer-RBW/Finalized 
     if (isTransfer && targets.length > 0) {
       throw new IOException(stage + " does not support multiple targets "
@@ -660,11 +665,6 @@ class DataXceiver extends Receiver implements Runnable {
     LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
         + localAddress);
 
-    // reply to upstream datanode or client 
-    final DataOutputStream replyOut = getBufferedOutputStream();
-    checkAccess(replyOut, isClient, block, blockToken,
-        Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
-
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
     Socket mirrorSock = null;           // socket to next target
@@ -863,13 +863,13 @@ class DataXceiver extends Receiver implements Runnable {
       final String clientName,
       final DatanodeInfo[] targets,
       final StorageType[] targetStorageTypes) throws IOException {
-    checkAccess(socketOut, true, blk, blockToken,
-        Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
     previousOpClientName = clientName;
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
+    checkAccess(out, true, blk, blockToken,
+        Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets,
           targetStorageTypes, clientName);
@@ -923,6 +923,7 @@ class DataXceiver extends Receiver implements Runnable {
   @Override
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    updateCurrentThreadName("Getting checksum for block " + block);
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     checkAccess(out, true, block, blockToken,
@@ -933,13 +934,11 @@ class DataXceiver extends Receiver implements Runnable {
     long visibleLength = datanode.data.getReplicaVisibleLength(block);
     boolean partialBlk = requestLength < visibleLength;
 
-    updateCurrentThreadName("Reading metadata for block " + block);
     final LengthInputStream metadataIn = datanode.data
         .getMetaDataInputStream(block);
     
     final DataInputStream checksumIn = new DataInputStream(
         new BufferedInputStream(metadataIn, ioFileBufferSize));
-    updateCurrentThreadName("Getting checksum for block " + block);
     try {
       //read metadata file
       final BlockMetadataHeader header = BlockMetadataHeader
@@ -987,21 +986,10 @@ class DataXceiver extends Receiver implements Runnable {
   public void copyBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);
-    // Read in the header
-    if (datanode.isBlockTokenEnabled) {
-      try {
-        datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
-            BlockTokenIdentifier.AccessMode.COPY);
-      } catch (InvalidToken e) {
-        LOG.warn("Invalid access token in request from " + remoteAddress
-            + " for OP_COPY_BLOCK for block " + block + " : "
-            + e.getLocalizedMessage());
-        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
-        return;
-      }
+    DataOutputStream reply = getBufferedOutputStream();
+    checkAccess(reply, true, block, blockToken,
+        Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
 
-    }
-    
     if (datanode.data.getPinning(block)) {
       String msg = "Not able to copy block " + block.getBlockId() + " " +
           "to " + peer.getRemoteAddressString() + " because it's pinned ";
@@ -1019,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
     }
 
     BlockSender blockSender = null;
-    DataOutputStream reply = null;
     boolean isOpSuccess = true;
 
     try {
@@ -1027,10 +1014,7 @@ class DataXceiver extends Receiver implements Runnable {
       blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, 
           null, CachingStrategy.newDropBehind());
 
-      // set up response stream
       OutputStream baseStream = getOutputStream();
-      reply = new DataOutputStream(new BufferedOutputStream(
-          baseStream, smallBufferSize));
 
       // send status first
       writeSuccessWithChecksumInfo(blockSender, reply);
@@ -1074,20 +1058,9 @@ class DataXceiver extends Receiver implements Runnable {
       final String delHint,
       final DatanodeInfo proxySource) throws IOException {
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
-
-    /* read header */
-    if (datanode.isBlockTokenEnabled) {
-      try {
-        datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
-            BlockTokenIdentifier.AccessMode.REPLACE);
-      } catch (InvalidToken e) {
-        LOG.warn("Invalid access token in request from " + remoteAddress
-            + " for OP_REPLACE_BLOCK for block " + block + " : "
-            + e.getLocalizedMessage());
-        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
-        return;
-      }
-    }
+    DataOutputStream replyOut = new DataOutputStream(getOutputStream());
+    checkAccess(replyOut, true, block, blockToken,
+        Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE);
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to receive block " + block.getBlockId() +
@@ -1104,7 +1077,6 @@ class DataXceiver extends Receiver implements Runnable {
     String errMsg = null;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
-    DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     boolean IoeDuringCopyBlockOperation = false;
     try {
       // Move the block to different storage in the same datanode
@@ -1296,11 +1268,52 @@ class DataXceiver extends Receiver implements Runnable {
     datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
   }
 
+  /**
+   * Wait until the BP is registered, upto the configured amount of time.
+   * Throws an exception if times out, which should fail the client request.
+   * @param the requested block
+   */
+  void checkAndWaitForBP(final ExtendedBlock block)
+      throws IOException {
+    String bpId = block.getBlockPoolId();
+
+    // The registration is only missing in relatively short time window.
+    // Optimistically perform this first.
+    try {
+      datanode.getDNRegistrationForBP(bpId);
+      return;
+    } catch (IOException ioe) {
+      // not registered
+    }
+
+    // retry
+    long bpReadyTimeout = dnConf.getBpReadyTimeout();
+    StopWatch sw = new StopWatch();
+    sw.start();
+    while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) {
+      try {
+        datanode.getDNRegistrationForBP(bpId);
+        return;
+      } catch (IOException ioe) {
+        // not registered
+      }
+      // sleep before trying again
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        throw new IOException("Interrupted while serving request. Aborting.");
+      }
+    }
+    // failed to obtain registration.
+    throw new IOException("Not ready to serve the block pool, " + bpId + ".");
+  }
+
   private void checkAccess(OutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final Op op,
       final BlockTokenIdentifier.AccessMode mode) throws IOException {
+    checkAndWaitForBP(blk);
     if (datanode.isBlockTokenEnabled) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Checking block access token for block '" + blk.getBlockId()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1fce33c..397c67b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2721,4 +2721,14 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.bp-ready.timeout</name>
+  <value>20</value>
+  <description>
+    The maximum wait time for datanode to be ready before failing the
+    received request. Setting this to 0 fails requests right away if the
+    datanode is not yet registered with the namenode. This wait time
+    reduces initial request failures after datanode restart.
+  </description>
+</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index d8a7188..3af959c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.net.*;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.datatransfer.*;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DataChecksum;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
@@ -162,17 +163,20 @@ public class TestDataXceiverLazyPersistHint {
     return peer;
   }
 
-  private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) {
+  private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist)
+      throws IOException {
     Configuration conf = new HdfsConfiguration();
     conf.setBoolean(
         DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
         nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
     DNConf dnConf = new DNConf(conf);
+    DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
     DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
     DataNode mockDn = mock(DataNode.class);
     when(mockDn.getDnConf()).thenReturn(dnConf);
     when(mockDn.getConf()).thenReturn(conf);
     when(mockDn.getMetrics()).thenReturn(mockMetrics);
+    when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg);
     return mockDn;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38c4c144/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
index 8bbac9f..40a3d9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -146,4 +147,75 @@ public class TestDatanodeRestart {
   private static FsDatasetImpl dataset(DataNode dn) {
     return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
   }
+
+  @Test
+  public void testWaitForRegistrationOnRestart() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+
+    // This makes the datanode appear registered to the NN, but it won't be
+    // able to get to the saved dn reg internally.
+    DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
+      @Override
+      public void noRegistration() throws IOException {
+        throw new IOException("no reg found for testing");
+      }
+    };
+    DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(dnFaultInjector);
+    MiniDFSCluster cluster = null;
+    long start = 0;
+    Path file = new Path("/reg");
+    try {
+      int numDNs = 1;
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+      cluster.waitActive();
+
+      start = System.currentTimeMillis();
+      FileSystem fileSys = cluster.getFileSystem();
+      try {
+        DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
+        // It is a bug if this does not fail.
+        throw new IOException("Did not fail!");
+      } catch (org.apache.hadoop.ipc.RemoteException e) {
+        long elapsed = System.currentTimeMillis() - start;
+        // timers have at-least semantics, so it should be at least 5 seconds.
+        if (elapsed < 5000 || elapsed > 10000) {
+          throw new IOException(elapsed + " seconds passed.", e);
+        }
+      }
+      DataNodeFaultInjector.set(oldDnInjector);
+      // this should succeed now.
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
+
+      // turn it back to under-construction, so that the client calls
+      // getReplicaVisibleLength() rpc method against the datanode.
+      fileSys.append(file);
+      // back to simulating unregistered node.
+      DataNodeFaultInjector.set(dnFaultInjector);
+      byte[] buffer = new byte[8];
+      start = System.currentTimeMillis();
+      try {
+        fileSys.open(file).read(0L, buffer, 0, 1);
+        throw new IOException("Did not fail!");
+      } catch (IOException e) {
+        long elapsed = System.currentTimeMillis() - start;
+        if (e.getMessage().contains("readBlockLength")) {
+          throw new IOException("Failed, but with unexpected exception:", e);
+        }
+        // timers have at-least semantics, so it should be at least 5 seconds.
+        if (elapsed < 5000 || elapsed > 10000) {
+          throw new IOException(elapsed + " seconds passed.", e);
+        }
+      }
+      DataNodeFaultInjector.set(oldDnInjector);
+      fileSys.open(file).read(0L, buffer, 0, 1);
+    } finally {
+      DataNodeFaultInjector.set(oldDnInjector);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }