You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:09 UTC

svn commit: r1513258 [2/6] - in /hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/tomcat/ROOT/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/m...

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Mon Aug 12 21:25:49 2013
@@ -36,16 +36,10 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@@ -66,8 +60,15 @@ import org.apache.hadoop.security.Refres
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
+import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -297,9 +298,8 @@ public class NameNodeProxies {
     return new ClientNamenodeProtocolTranslatorPB(proxy);
   }
   
-  @SuppressWarnings("unchecked")
   private static Object createNameNodeProxy(InetSocketAddress address,
-      Configuration conf, UserGroupInformation ugi, Class xface)
+      Configuration conf, UserGroupInformation ugi, Class<?> xface)
       throws IOException {
     RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
     Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
@@ -308,7 +308,8 @@ public class NameNodeProxies {
   }
 
   /** Gets the configured Failover proxy provider's class */
-  private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
+  @VisibleForTesting
+  public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
       Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
     if (nameNodeUri == null) {
       return null;
@@ -345,8 +346,8 @@ public class NameNodeProxies {
   }
 
   /** Creates the Failover proxy provider instance*/
-  @SuppressWarnings("unchecked")
-  private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
+  @VisibleForTesting
+  public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
       Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
       Class<T> xface, URI nameNodeUri) throws IOException {
     Preconditions.checkArgument(
@@ -355,9 +356,9 @@ public class NameNodeProxies {
     try {
       Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
           .getConstructor(Configuration.class, URI.class, Class.class);
-      FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
+      FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
           xface);
-      return (FailoverProxyProvider<T>) provider;
+      return provider;
     } catch (Exception e) {
       String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
       if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Mon Aug 12 21:25:49 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -381,13 +382,14 @@ public class RemoteBlockReader extends F
                                      int bufferSize, boolean verifyChecksum,
                                      String clientName, Peer peer,
                                      DatanodeID datanodeID,
-                                     PeerCache peerCache)
-                                     throws IOException {
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy)
+                                       throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum);
+        verifyChecksum, cachingStrategy);
     
     //
     // Get bytes in block, set streams

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Mon Aug 12 21:25:49 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -375,12 +376,13 @@ public class RemoteBlockReader2  impleme
                                      boolean verifyChecksum,
                                      String clientName,
                                      Peer peer, DatanodeID datanodeID,
-                                     PeerCache peerCache) throws IOException {
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy) throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
           peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum);
+        verifyChecksum, cachingStrategy);
 
     //
     // Get bytes in block

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Mon Aug 12 21:25:49 2013
@@ -38,9 +38,9 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.KerberosInfo;
@@ -140,7 +140,7 @@ public interface ClientProtocol {
    * <p>
    * Blocks have a maximum size.  Clients that intend to create
    * multi-block files must also use 
-   * {@link #addBlock(String, String, ExtendedBlock, DatanodeInfo[])}
+   * {@link #addBlock}
    *
    * @param src path of the file being created.
    * @param masked masked permission.
@@ -171,7 +171,10 @@ public interface ClientProtocol {
    *
    * RuntimeExceptions:
    * @throws InvalidPathException Path <code>src</code> is invalid
+   * <p>
+   * <em>Note that create with {@link CreateFlag#OVERWRITE} is idempotent.</em>
    */
+  @AtMostOnce
   public HdfsFileStatus create(String src, FsPermission masked,
       String clientName, EnumSetWritable<CreateFlag> flag,
       boolean createParent, short replication, long blockSize)
@@ -205,6 +208,7 @@ public interface ClientProtocol {
    * RuntimeExceptions:
    * @throws UnsupportedOperationException if append is not supported
    */
+  @AtMostOnce
   public LocatedBlock append(String src, String clientName)
       throws AccessControlException, DSQuotaExceededException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
@@ -275,8 +279,8 @@ public interface ClientProtocol {
 
   /**
    * The client can give up on a block by calling abandonBlock().
-   * The client can then
-   * either obtain a new block, or complete or abandon the file.
+   * The client can then either obtain a new block, or complete or abandon the 
+   * file.
    * Any partial writes to the block will be discarded.
    * 
    * @throws AccessControlException If access is denied
@@ -284,6 +288,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void abandonBlock(ExtendedBlock b, String src, String holder)
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException;
@@ -409,6 +414,7 @@ public interface ClientProtocol {
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException an I/O error occurred 
    */
+  @AtMostOnce
   public boolean rename(String src, String dst) 
       throws UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
@@ -422,6 +428,7 @@ public interface ClientProtocol {
    *           contains a symlink
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
+  @AtMostOnce
   public void concat(String trg, String[] srcs) 
       throws IOException, UnresolvedLinkException, SnapshotAccessControlException;
 
@@ -460,6 +467,7 @@ public interface ClientProtocol {
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    */
+  @AtMostOnce
   public void rename2(String src, String dst, Options.Rename... options)
       throws AccessControlException, DSQuotaExceededException,
       FileAlreadyExistsException, FileNotFoundException,
@@ -484,6 +492,7 @@ public interface ClientProtocol {
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    */
+  @AtMostOnce
   public boolean delete(String src, boolean recursive)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, SnapshotAccessControlException, IOException;
@@ -704,6 +713,7 @@ public interface ClientProtocol {
    * @throws AccessControlException if the superuser privilege is violated.
    * @throws IOException if image creation failed.
    */
+  @AtMostOnce
   public void saveNamespace() throws AccessControlException, IOException;
 
   
@@ -725,6 +735,7 @@ public interface ClientProtocol {
    * 
    * @throws AccessControlException if the superuser privilege is violated.
    */
+  @Idempotent
   public boolean restoreFailedStorage(String arg) 
       throws AccessControlException, IOException;
 
@@ -732,6 +743,7 @@ public interface ClientProtocol {
    * Tells the namenode to reread the hosts and exclude files. 
    * @throws IOException
    */
+  @Idempotent
   public void refreshNodes() throws IOException;
 
   /**
@@ -741,6 +753,7 @@ public interface ClientProtocol {
    * 
    * @throws IOException
    */
+  @Idempotent
   public void finalizeUpgrade() throws IOException;
 
   /**
@@ -763,6 +776,7 @@ public interface ClientProtocol {
    *
    * @throws IOException
    */
+  @Idempotent
   public void metaSave(String filename) throws IOException;
 
   /**
@@ -918,6 +932,7 @@ public interface ClientProtocol {
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws IOException If an I/O error occurred
    */
+  @AtMostOnce
   public void createSymlink(String target, String link, FsPermission dirPerm,
       boolean createParent) throws AccessControlException,
       FileAlreadyExistsException, FileNotFoundException,
@@ -965,6 +980,7 @@ public interface ClientProtocol {
    * @param newNodes datanodes in the pipeline
    * @throws IOException if any error occurs
    */
+  @AtMostOnce
   public void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException;
@@ -997,6 +1013,7 @@ public interface ClientProtocol {
    * @param token delegation token
    * @throws IOException
    */
+  @Idempotent
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException;
   
@@ -1005,6 +1022,7 @@ public interface ClientProtocol {
    *         DataTransferProtocol to/from DataNodes.
    * @throws IOException
    */
+  @Idempotent
   public DataEncryptionKey getDataEncryptionKey() throws IOException;
   
   /**
@@ -1014,6 +1032,7 @@ public interface ClientProtocol {
    * @return the snapshot path.
    * @throws IOException
    */
+  @AtMostOnce
   public String createSnapshot(String snapshotRoot, String snapshotName)
       throws IOException;
 
@@ -1023,6 +1042,7 @@ public interface ClientProtocol {
    * @param snapshotName Name of the snapshot for the snapshottable directory
    * @throws IOException
    */
+  @AtMostOnce
   public void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws IOException;
   
@@ -1033,6 +1053,7 @@ public interface ClientProtocol {
    * @param snapshotNewName new name of the snapshot
    * @throws IOException
    */
+  @AtMostOnce
   public void renameSnapshot(String snapshotRoot, String snapshotOldName,
       String snapshotNewName) throws IOException;
   

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Mon Aug 12 21:25:49 2013
@@ -327,7 +327,7 @@ public class DatanodeInfo extends Datano
    * Check if the datanode is in stale state. Here if 
    * the namenode has not received heartbeat msg from a 
    * datanode for more than staleInterval (default value is
-   * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+   * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT}),
    * the datanode will be treated as stale node.
    * 
    * @param staleInterval

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Mon Aug 12 21:25:49 2013
@@ -104,7 +104,9 @@ public class LayoutVersion {
     OPTIMIZE_SNAPSHOT_INODES(-45, -43,
         "Reduce snapshot inode memory footprint", false),
     SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
-        "block IDs in the edits log and image files");
+        "block IDs in the edits log and image files"),
+    EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to " 
+        + "enable rebuilding retry cache in case of HA failover");
     
     final int lv;
     final int ancestorLV;

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Mon Aug 12 21:25:49 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -57,13 +58,15 @@ public interface DataTransferProtocol {
    * @param length maximum number of bytes for this read.
    * @param sendChecksum if false, the DN should skip reading and sending
    *        checksums
+   * @param cachingStrategy  The caching strategy to use.
    */
   public void readBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final long blockOffset,
       final long length,
-      final boolean sendChecksum) throws IOException;
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
 
   /**
    * Write a block to a datanode pipeline.
@@ -89,7 +92,8 @@ public interface DataTransferProtocol {
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
-      final DataChecksum requestedChecksum) throws IOException;
+      final DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
 
   /**
    * Transfer a block to another datanode.

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Mon Aug 12 21:25:49 2013
@@ -31,8 +31,10 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 
 /** Receiver */
 @InterfaceAudience.Private
@@ -85,6 +87,14 @@ public abstract class Receiver implement
     }
   }
 
+  static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
+    Boolean dropBehind = strategy.hasDropBehind() ?
+        strategy.getDropBehind() : null;
+    Long readahead = strategy.hasReadahead() ?
+        strategy.getReadahead() : null;
+    return new CachingStrategy(dropBehind, readahead);
+  }
+
   /** Receive OP_READ_BLOCK */
   private void opReadBlock() throws IOException {
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@@ -93,7 +103,10 @@ public abstract class Receiver implement
         proto.getHeader().getClientName(),
         proto.getOffset(),
         proto.getLen(),
-        proto.getSendChecksums());
+        proto.getSendChecksums(),
+        (proto.hasCachingStrategy() ?
+            getCachingStrategy(proto.getCachingStrategy()) :
+          CachingStrategy.newDefaultStrategy()));
   }
   
   /** Receive OP_WRITE_BLOCK */
@@ -108,7 +121,10 @@ public abstract class Receiver implement
         proto.getPipelineSize(),
         proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
         proto.getLatestGenerationStamp(),
-        fromProto(proto.getRequestedChecksum()));
+        fromProto(proto.getRequestedChecksum()),
+        (proto.hasCachingStrategy() ?
+            getCachingStrategy(proto.getCachingStrategy()) :
+          CachingStrategy.newDefaultStrategy()));
   }
 
   /** Receive {@link Op#TRANSFER_BLOCK} */

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Mon Aug 12 21:25:49 2013
@@ -35,9 +35,11 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -72,19 +74,32 @@ public class Sender implements DataTrans
     out.flush();
   }
 
+  static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+    if (cachingStrategy.getReadahead() != null) {
+      builder.setReadahead(cachingStrategy.getReadahead().longValue());
+    }
+    if (cachingStrategy.getDropBehind() != null) {
+      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+    }
+    return builder.build();
+  }
+
   @Override
   public void readBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final long blockOffset,
       final long length,
-      final boolean sendChecksum) throws IOException {
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
 
     OpReadBlockProto proto = OpReadBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
       .setOffset(blockOffset)
       .setLen(length)
       .setSendChecksums(sendChecksum)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
       .build();
 
     send(out, Op.READ_BLOCK, proto);
@@ -102,7 +117,8 @@ public class Sender implements DataTrans
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
@@ -117,7 +133,8 @@ public class Sender implements DataTrans
       .setMinBytesRcvd(minBytesRcvd)
       .setMaxBytesRcvd(maxBytesRcvd)
       .setLatestGenerationStamp(latestGenerationStamp)
-      .setRequestedChecksum(checksumProto);
+      .setRequestedChecksum(checksumProto)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy));
     
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Mon Aug 12 21:25:49 2013
@@ -707,11 +707,9 @@ public class ClientNamenodeProtocolServe
     try {
       HdfsFileStatus result = server.getFileLinkInfo(req.getSrc());
       if (result != null) {
-        System.out.println("got non null result for getFileLinkInfo for " + req.getSrc());
         return GetFileLinkInfoResponseProto.newBuilder().setFs(
             PBHelper.convert(result)).build();
       } else {
-        System.out.println("got  null result for getFileLinkInfo for " + req.getSrc());
         return VOID_GETFILELINKINFO_RESPONSE;      
       }
 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Aug 12 21:25:49 2013
@@ -1056,7 +1056,7 @@ public class PBHelper {
         fs.getPath().toByteArray(),
         fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
         fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
-        fs.hasChildrenNum() ? fs.getChildrenNum() : 0);
+        fs.hasChildrenNum() ? fs.getChildrenNum() : -1);
   }
 
   public static SnapshottableDirectoryStatus convert(

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Mon Aug 12 21:25:49 2013
@@ -22,7 +22,7 @@ import java.util.LinkedList;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet;
 
 /**
  * BlockInfo class maintains for a given block

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Aug 12 21:25:49 2013
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
@@ -86,7 +87,7 @@ import com.google.common.collect.Sets;
 public class BlockManager {
 
   static final Log LOG = LogFactory.getLog(BlockManager.class);
-  static final Log blockLog = NameNode.blockStateChangeLog;
+  public static final Log blockLog = NameNode.blockStateChangeLog;
 
   /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@@ -216,6 +217,9 @@ public class BlockManager {
   
   // whether or not to issue block encryption keys.
   final boolean encryptDataTransfer;
+  
+  // Max number of blocks to log info about during a block report.
+  private final long maxNumBlocksToLog;
 
   /**
    * When running inside a Standby node, the node may receive block reports
@@ -297,6 +301,10 @@ public class BlockManager {
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
     
+    this.maxNumBlocksToLog =
+        conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+            DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+    
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
@@ -304,6 +312,7 @@ public class BlockManager {
     LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
     LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
     LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
+    LOG.info("maxNumBlocksToLog          = " + maxNumBlocksToLog);
   }
 
   private static BlockTokenSecretManager createBlockTokenSecretManager(
@@ -314,6 +323,12 @@ public class BlockManager {
     LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + "=" + isEnabled);
 
     if (!isEnabled) {
+      if (UserGroupInformation.isSecurityEnabled()) {
+	      LOG.error("Security is enabled but block access tokens " +
+		      "(via " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + ") " +
+		      "aren't enabled. This may cause issues " +
+		      "when clients attempt to talk to a DataNode.");
+      }
       return null;
     }
 
@@ -845,8 +860,10 @@ public class BlockManager {
   public void setBlockToken(final LocatedBlock b,
       final BlockTokenSecretManager.AccessMode mode) throws IOException {
     if (isBlockTokenEnabled()) {
-      b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(), 
-          EnumSet.of(mode)));
+      // Use cached UGI if serving RPC calls.
+      b.setBlockToken(blockTokenSecretManager.generateToken(
+          NameNode.getRemoteUser().getShortUserName(),
+          b.getBlock(), EnumSet.of(mode)));
     }    
   }
 
@@ -1319,7 +1336,7 @@ public class BlockManager {
           // Move the block-replication into a "pending" state.
           // The reason we use 'pending' is so we can retry
           // replications that fail after an appropriate amount of time.
-          pendingReplications.increment(block, targets.length);
+          pendingReplications.increment(block, targets);
           if(blockLog.isDebugEnabled()) {
             blockLog.debug(
                 "BLOCK* block " + block
@@ -1698,8 +1715,14 @@ public class BlockManager {
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
+    int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, null, true);
+      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+      numBlocksLogged++;
+    }
+    if (numBlocksLogged > maxNumBlocksToLog) {
+      blockLog.info("BLOCK* processReport: logged info for " + maxNumBlocksToLog
+          + " of " + numBlocksLogged + " reported.");
     }
     for (Block b : toInvalidate) {
       blockLog.info("BLOCK* processReport: "
@@ -2599,6 +2622,8 @@ assert storedBlock.findDatanode(dn) < 0 
   void addBlock(DatanodeDescriptor node, Block block, String delHint)
       throws IOException {
     // decrement number of blocks scheduled to this datanode.
+    // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
+    // RECEIVED_BLOCK), we currently also decrease the approximate number. 
     node.decBlocksScheduled();
 
     // get the deletion hint node
@@ -2614,7 +2639,7 @@ assert storedBlock.findDatanode(dn) < 0 
     //
     // Modify the blocks->datanode map and node's map.
     //
-    pendingReplications.decrement(block);
+    pendingReplications.decrement(block, node);
     processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
         delHintNode);
   }
@@ -2637,8 +2662,14 @@ assert storedBlock.findDatanode(dn) < 0 
     for (StatefulBlockInfo b : toUC) { 
       addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
     }
+    long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, delHintNode, true);
+      addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+      numBlocksLogged++;
+    }
+    if (numBlocksLogged > maxNumBlocksToLog) {
+      blockLog.info("BLOCK* addBlock: logged info for " + maxNumBlocksToLog
+          + " of " + numBlocksLogged + " reported.");
     }
     for (Block b : toInvalidate) {
       blockLog.info("BLOCK* addBlock: block "
@@ -2655,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0 
    * The given node is reporting incremental information about some blocks.
    * This includes blocks that are starting to be received, completed being
    * received, or deleted.
+   * 
+   * This method must be called with FSNamesystem lock held.
    */
-  public void processIncrementalBlockReport(final DatanodeID nodeID, 
-     final String poolId, 
-     final ReceivedDeletedBlockInfo blockInfos[]
-  ) throws IOException {
-    namesystem.writeLock();
+  public void processIncrementalBlockReport(final DatanodeID nodeID,
+      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      throws IOException {
+    assert namesystem.hasWriteLock();
     int received = 0;
     int deleted = 0;
     int receiving = 0;
-    try {
-      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        blockLog
-            .warn("BLOCK* processIncrementalBlockReport"
-                + " is received from dead or unregistered node "
-                + nodeID);
-        throw new IOException(
-            "Got incremental block report from unregistered or dead node");
-      }
+    final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+    if (node == null || !node.isAlive) {
+      blockLog
+          .warn("BLOCK* processIncrementalBlockReport"
+              + " is received from dead or unregistered node "
+              + nodeID);
+      throw new IOException(
+          "Got incremental block report from unregistered or dead node");
+    }
 
-      for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
-        switch (rdbi.getStatus()) {
-        case DELETED_BLOCK:
-          removeStoredBlock(rdbi.getBlock(), node);
-          deleted++;
-          break;
-        case RECEIVED_BLOCK:
-          addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
-          received++;
-          break;
-        case RECEIVING_BLOCK:
-          receiving++;
-          processAndHandleReportedBlock(node, rdbi.getBlock(),
-              ReplicaState.RBW, null);
-          break;
-        default:
-          String msg = 
-            "Unknown block status code reported by " + nodeID +
-            ": " + rdbi;
-          blockLog.warn(msg);
-          assert false : msg; // if assertions are enabled, throw.
-          break;
-        }
-        if (blockLog.isDebugEnabled()) {
-          blockLog.debug("BLOCK* block "
-              + (rdbi.getStatus()) + ": " + rdbi.getBlock()
-              + " is received from " + nodeID);
-        }
+    for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+      switch (rdbi.getStatus()) {
+      case DELETED_BLOCK:
+        removeStoredBlock(rdbi.getBlock(), node);
+        deleted++;
+        break;
+      case RECEIVED_BLOCK:
+        addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+        received++;
+        break;
+      case RECEIVING_BLOCK:
+        receiving++;
+        processAndHandleReportedBlock(node, rdbi.getBlock(),
+            ReplicaState.RBW, null);
+        break;
+      default:
+        String msg = 
+          "Unknown block status code reported by " + nodeID +
+          ": " + rdbi;
+        blockLog.warn(msg);
+        assert false : msg; // if assertions are enabled, throw.
+        break;
+      }
+      if (blockLog.isDebugEnabled()) {
+        blockLog.debug("BLOCK* block "
+            + (rdbi.getStatus()) + ": " + rdbi.getBlock()
+            + " is received from " + nodeID);
       }
-    } finally {
-      namesystem.writeUnlock();
-      blockLog
-          .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
-              + nodeID
-              +  " receiving: " + receiving + ", "
-              + " received: " + received + ", "
-              + " deleted: " + deleted);
     }
+    blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+        + nodeID + " receiving: " + receiving + ", " + " received: " + received
+        + ", " + " deleted: " + deleted);
   }
 
   /**

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Mon Aug 12 21:25:49 2013
@@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs.server.bl
 import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.util.GSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
 
 /**
  * This class maintains the map from a block to its metadata.

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Mon Aug 12 21:25:49 2013
@@ -22,8 +22,10 @@ import static org.apache.hadoop.util.Tim
 import java.io.PrintWriter;
 import java.sql.Time;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -71,14 +73,16 @@ class PendingReplicationBlocks {
 
   /**
    * Add a block to the list of pending Replications
+   * @param block The corresponding block
+   * @param targets The DataNodes where replicas of the block should be placed
    */
-  void increment(Block block, int numReplicas) {
+  void increment(Block block, DatanodeDescriptor[] targets) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found == null) {
-        pendingReplications.put(block, new PendingBlockInfo(numReplicas));
+        pendingReplications.put(block, new PendingBlockInfo(targets));
       } else {
-        found.incrementReplicas(numReplicas);
+        found.incrementReplicas(targets);
         found.setTimeStamp();
       }
     }
@@ -88,15 +92,17 @@ class PendingReplicationBlocks {
    * One replication request for this block has finished.
    * Decrement the number of pending replication requests
    * for this block.
+   * 
+   * @param The DataNode that finishes the replication
    */
-  void decrement(Block block) {
+  void decrement(Block block, DatanodeDescriptor dn) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
         if(LOG.isDebugEnabled()) {
           LOG.debug("Removing pending replication for " + block);
         }
-        found.decrementReplicas();
+        found.decrementReplicas(dn);
         if (found.getNumReplicas() <= 0) {
           pendingReplications.remove(block);
         }
@@ -153,7 +159,7 @@ class PendingReplicationBlocks {
         return null;
       }
       Block[] blockList = timedOutItems.toArray(
-                                                new Block[timedOutItems.size()]);
+          new Block[timedOutItems.size()]);
       timedOutItems.clear();
       return blockList;
     }
@@ -163,16 +169,17 @@ class PendingReplicationBlocks {
    * An object that contains information about a block that 
    * is being replicated. It records the timestamp when the 
    * system started replicating the most recent copy of this
-   * block. It also records the number of replication
-   * requests that are in progress.
+   * block. It also records the list of Datanodes where the 
+   * replication requests are in progress.
    */
   static class PendingBlockInfo {
     private long timeStamp;
-    private int numReplicasInProgress;
+    private final List<DatanodeDescriptor> targets;
 
-    PendingBlockInfo(int numReplicas) {
+    PendingBlockInfo(DatanodeDescriptor[] targets) {
       this.timeStamp = now();
-      this.numReplicasInProgress = numReplicas;
+      this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
+          : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
     }
 
     long getTimeStamp() {
@@ -183,17 +190,20 @@ class PendingReplicationBlocks {
       timeStamp = now();
     }
 
-    void incrementReplicas(int increment) {
-      numReplicasInProgress += increment;
+    void incrementReplicas(DatanodeDescriptor... newTargets) {
+      if (newTargets != null) {
+        for (DatanodeDescriptor dn : newTargets) {
+          targets.add(dn);
+        }
+      }
     }
 
-    void decrementReplicas() {
-      numReplicasInProgress--;
-      assert(numReplicasInProgress >= 0);
+    void decrementReplicas(DatanodeDescriptor dn) {
+      targets.remove(dn);
     }
 
     int getNumReplicas() {
-      return numReplicasInProgress;
+      return targets.size();
     }
   }
 
@@ -274,7 +284,7 @@ class PendingReplicationBlocks {
         out.println(block + 
                     " StartTime: " + new Time(pendingBlock.timeStamp) +
                     " NumReplicaInProgress: " + 
-                    pendingBlock.numReplicasInProgress);
+                    pendingBlock.getNumReplicas());
       }
     }
   }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Aug 12 21:25:49 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@@ -217,7 +218,7 @@ public class JspHelper {
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         new DatanodeID(addr.getAddress().getHostAddress(),
             addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
-            null, null, false);
+            null, null, false, CachingStrategy.newDefaultStrategy());
         
     final byte[] buf = new byte[amtToRead];
     int readOffset = 0;

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Mon Aug 12 21:25:49 2013
@@ -48,9 +48,9 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.util.GSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Time;
 
@@ -417,7 +417,7 @@ class BlockPoolSliceScanner {
         adjustThrottler();
         
         blockSender = new BlockSender(block, 0, -1, false, true, true, 
-            datanode, null);
+            datanode, null, CachingStrategy.newDropBehind());
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug 12 21:25:49 2013
@@ -51,6 +51,9 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /** A class that receives a block and writes to its own disk, meanwhile
  * may copies it to another site. If a throttler is provided,
@@ -60,7 +63,8 @@ class BlockReceiver implements Closeable
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
 
-  private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
+  @VisibleForTesting
+  static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   private DataInputStream in = null; // from where data are read
   private DataChecksum clientChecksum; // checksum used by client
@@ -96,8 +100,8 @@ class BlockReceiver implements Closeable
 
   // Cache management state
   private boolean dropCacheBehindWrites;
+  private long lastCacheManagementOffset = 0;
   private boolean syncBehindWrites;
-  private long lastCacheDropOffset = 0;
 
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
@@ -119,8 +123,8 @@ class BlockReceiver implements Closeable
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
-      final DataNode datanode, DataChecksum requestedChecksum)
-      throws IOException {
+      final DataNode datanode, DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -145,6 +149,7 @@ class BlockReceiver implements Closeable
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
+            + "\n  cachingStrategy = " + cachingStrategy
             );
       }
 
@@ -191,7 +196,9 @@ class BlockReceiver implements Closeable
               " while receiving block " + block + " from " + inAddr);
         }
       }
-      this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
+      this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
+        datanode.getDnConf().dropCacheBehindWrites :
+          cachingStrategy.getDropBehind();
       this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
       
       final boolean isCreate = isDatanode || isTransfer 
@@ -597,7 +604,7 @@ class BlockReceiver implements Closeable
 
           datanode.metrics.incrBytesWritten(len);
 
-          dropOsCacheBehindWriter(offsetInBlock);
+          manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -619,25 +626,44 @@ class BlockReceiver implements Closeable
     return lastPacketInBlock?-1:len;
   }
 
-  private void dropOsCacheBehindWriter(long offsetInBlock) {
+  private void manageWriterOsCache(long offsetInBlock) {
     try {
       if (outFd != null &&
-          offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
-        long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
-        if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
-          NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
-              NativeIO.POSIX.POSIX_FADV_DONTNEED);
-        }
-        
+          offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+        //
+        // For SYNC_FILE_RANGE_WRITE, we want to sync from
+        // lastCacheManagementOffset to a position "two windows ago"
+        //
+        //                         <========= sync ===========>
+        // +-----------------------O--------------------------X
+        // start                  last                      curPos
+        // of file                 
+        //
         if (syncBehindWrites) {
-          NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+          NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+              lastCacheManagementOffset,
+              offsetInBlock - lastCacheManagementOffset,
               NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
         }
-        
-        lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+        //
+        // For POSIX_FADV_DONTNEED, we want to drop from the beginning 
+        // of the file to a position prior to the current position.
+        //
+        // <=== drop =====> 
+        //                 <---W--->
+        // +--------------+--------O--------------------------X
+        // start        dropPos   last                      curPos
+        // of file             
+        //                     
+        long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
+        if (dropPos > 0 && dropCacheBehindWrites) {
+          NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+              outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        }
+        lastCacheManagementOffset = offsetInBlock;
       }
     } catch (Throwable t) {
-      LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+      LOG.warn("Error managing cache for writer of block " + block, t);
     }
   }
 
@@ -706,7 +732,13 @@ class BlockReceiver implements Closeable
       }
       if (responder != null) {
         try {
-          responder.join();
+          responder.join(datanode.getDnConf().getXceiverStopTimeout());
+          if (responder.isAlive()) {
+            String msg = "Join on responder thread " + responder
+                + " timed out";
+            LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder));
+            throw new IOException(msg);
+          }
         } catch (InterruptedException e) {
           responder.interrupt();
           throw new IOException("Interrupted receiveBlock");

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Mon Aug 12 21:25:49 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.nativeio.Nat
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -141,13 +142,22 @@ class BlockSender implements java.io.Clo
 
   // Cache-management related fields
   private final long readaheadLength;
-  private boolean shouldDropCacheBehindRead;
+
   private ReadaheadRequest curReadahead;
+
+  private final boolean alwaysReadahead;
+  
+  private final boolean dropCacheBehindLargeReads;
+  
+  private final boolean dropCacheBehindAllReads;
+  
   private long lastCacheDropOffset;
-  private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  
+  @VisibleForTesting
+  static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  
   /**
-   * Minimum length of read below which management of the OS
-   * buffer cache is disabled.
+   * See {{@link BlockSender#isLongRead()}
    */
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
   
@@ -167,16 +177,42 @@ class BlockSender implements java.io.Clo
    */
   BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean verifyChecksum,
-              boolean sendChecksum,
-              DataNode datanode, String clientTraceFmt)
+              boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+              CachingStrategy cachingStrategy)
       throws IOException {
     try {
       this.block = block;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
       this.clientTraceFmt = clientTraceFmt;
-      this.readaheadLength = datanode.getDnConf().readaheadLength;
-      this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
+
+      /*
+       * If the client asked for the cache to be dropped behind all reads,
+       * we honor that.  Otherwise, we use the DataNode defaults.
+       * When using DataNode defaults, we use a heuristic where we only
+       * drop the cache for large reads.
+       */
+      if (cachingStrategy.getDropBehind() == null) {
+        this.dropCacheBehindAllReads = false;
+        this.dropCacheBehindLargeReads =
+            datanode.getDnConf().dropCacheBehindReads;
+      } else {
+        this.dropCacheBehindAllReads =
+            this.dropCacheBehindLargeReads =
+                 cachingStrategy.getDropBehind().booleanValue();
+      }
+      /*
+       * Similarly, if readahead was explicitly requested, we always do it.
+       * Otherwise, we read ahead based on the DataNode settings, and only
+       * when the reads are large.
+       */
+      if (cachingStrategy.getReadahead() == null) {
+        this.alwaysReadahead = false;
+        this.readaheadLength = datanode.getDnConf().readaheadLength;
+      } else {
+        this.alwaysReadahead = true;
+        this.readaheadLength = cachingStrategy.getReadahead().longValue();
+      }
       this.datanode = datanode;
       
       if (verifyChecksum) {
@@ -335,10 +371,11 @@ class BlockSender implements java.io.Clo
    */
   @Override
   public void close() throws IOException {
-    if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
-      // drop the last few MB of the file from cache
+    if (blockInFd != null &&
+        ((dropCacheBehindAllReads) ||
+         (dropCacheBehindLargeReads && isLongRead()))) {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(
+        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
             blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Exception e) {
@@ -637,7 +674,7 @@ class BlockSender implements java.io.Clo
 
     if (isLongRead() && blockInFd != null) {
       // Advise that this file descriptor will be accessed sequentially.
-      NativeIO.POSIX.posixFadviseIfPossible(
+      NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
           blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
     }
     
@@ -705,37 +742,47 @@ class BlockSender implements java.io.Clo
    * and drop-behind.
    */
   private void manageOsCache() throws IOException {
-    if (!isLongRead() || blockInFd == null) {
-      // don't manage cache manually for short-reads, like
-      // HBase random read workloads.
-      return;
-    }
+    // We can't manage the cache for this block if we don't have a file
+    // descriptor to work with.
+    if (blockInFd == null) return;
 
     // Perform readahead if necessary
-    if (readaheadLength > 0 && datanode.readaheadPool != null) {
+    if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
+          (alwaysReadahead || isLongRead())) {
       curReadahead = datanode.readaheadPool.readaheadStream(
-          clientTraceFmt, blockInFd,
-          offset, readaheadLength, Long.MAX_VALUE,
+          clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
           curReadahead);
     }
 
     // Drop what we've just read from cache, since we aren't
     // likely to need it again
-    long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
-    if (shouldDropCacheBehindRead &&
-        offset >= nextCacheDropOffset) {
-      long dropLength = offset - lastCacheDropOffset;
-      if (dropLength >= 1024) {
-        NativeIO.POSIX.posixFadviseIfPossible(blockInFd,
-            lastCacheDropOffset, dropLength,
+    if (dropCacheBehindAllReads ||
+        (dropCacheBehindLargeReads && isLongRead())) {
+      long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+      if (offset >= nextCacheDropOffset) {
+        long dropLength = offset - lastCacheDropOffset;
+        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+            blockInFd, lastCacheDropOffset, dropLength,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        lastCacheDropOffset = offset;
       }
-      lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
     }
   }
 
+  /**
+   * Returns true if we have done a long enough read for this block to qualify
+   * for the DataNode-wide cache management defaults.  We avoid applying the
+   * cache management defaults to smaller reads because the overhead would be
+   * too high.
+   *
+   * Note that if the client explicitly asked for dropBehind, we will do it
+   * even on short reads.
+   * 
+   * This is also used to determine when to invoke
+   * posix_fadvise(POSIX_FADV_SEQUENTIAL).
+   */
   private boolean isLongRead() {
-    return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+    return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
   }
 
   /**

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Mon Aug 12 21:25:49 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
@@ -29,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
@@ -44,7 +47,8 @@ import org.apache.hadoop.hdfs.server.com
  * Simple class encapsulating all of the configuration that the DataNode
  * loads at startup time.
  */
-class DNConf {
+@InterfaceAudience.Private
+public class DNConf {
   final int socketTimeout;
   final int socketWriteTimeout;
   final int socketKeepaliveTimeout;
@@ -66,6 +70,8 @@ class DNConf {
   
   final String minimumNameNodeVersion;
   final String encryptionAlgorithm;
+  
+  final long xceiverStopTimeout;
 
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -127,10 +133,18 @@ class DNConf {
     this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
     this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+    
+    this.xceiverStopTimeout = conf.getLong(
+        DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
+        DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
   }
+  
+  public long getXceiverStopTimeout() {
+    return xceiverStopTimeout;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Aug 12 21:25:49 2013
@@ -171,6 +171,7 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -284,6 +285,8 @@ public class DataNode extends Configured
   // For InterDataNodeProtocol
   public RPC.Server ipcServer;
 
+  private JvmPauseMonitor pauseMonitor;
+
   private SecureResources secureResources = null;
   private AbstractList<File> dataDirs;
   private Configuration conf;
@@ -399,7 +402,7 @@ public class DataNode extends Configured
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
           DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
       Configuration sslConf = new HdfsConfiguration(false);
-      sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
+      sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
           "ssl-server.xml"));
       this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
       if(LOG.isDebugEnabled()) {
@@ -447,10 +450,15 @@ public class DataNode extends Configured
           new ClientDatanodeProtocolServerSideTranslatorPB(this);
     BlockingService service = ClientDatanodeProtocolService
         .newReflectiveBlockingService(clientDatanodeProtocolXlator);
-    ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
-        .getHostName(), ipcAddr.getPort(), conf.getInt(
-        DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
-        false, conf, blockPoolTokenSecretManager);
+    ipcServer = new RPC.Builder(conf)
+        .setProtocol(ClientDatanodeProtocolPB.class)
+        .setInstance(service)
+        .setBindAddress(ipcAddr.getHostName())
+        .setPort(ipcAddr.getPort())
+        .setNumHandlers(
+            conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+                DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
+        .setSecretManager(blockPoolTokenSecretManager).build();
     
     InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
         new InterDatanodeProtocolServerSideTranslatorPB(this);
@@ -739,6 +747,8 @@ public class DataNode extends Configured
     registerMXBean();
     initDataXceiver(conf);
     startInfoServer(conf);
+    pauseMonitor = new JvmPauseMonitor(conf);
+    pauseMonitor.start();
   
     // BlockPoolTokenSecretManager is required to create ipc server.
     this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
@@ -977,7 +987,8 @@ public class DataNode extends Configured
    * @return BP registration object
    * @throws IOException
    */
-  DatanodeRegistration getDNRegistrationForBP(String bpid) 
+  @VisibleForTesting
+  public DatanodeRegistration getDNRegistrationForBP(String bpid) 
   throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos==null || bpos.bpRegistration==null) {
@@ -1221,6 +1232,9 @@ public class DataNode extends Configured
     if (ipcServer != null) {
       ipcServer.stop();
     }
+    if (pauseMonitor != null) {
+      pauseMonitor.stop();
+    }
     
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
@@ -1511,6 +1525,7 @@ public class DataNode extends Configured
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
     final String clientname;
+    final CachingStrategy cachingStrategy;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
@@ -1531,6 +1546,8 @@ public class DataNode extends Configured
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
       bpReg = bpos.bpRegistration;
       this.clientname = clientname;
+      this.cachingStrategy =
+          new CachingStrategy(true, getDnConf().readaheadLength);
     }
 
     /**
@@ -1573,7 +1590,7 @@ public class DataNode extends Configured
             HdfsConstants.SMALL_BUFFER_SIZE));
         in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null);
+            false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //
@@ -1586,7 +1603,7 @@ public class DataNode extends Configured
         }
 
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum());
+            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);
@@ -2436,7 +2453,7 @@ public class DataNode extends Configured
     return dxcs.balanceThrottler.getBandwidth();
   }
   
-  DNConf getDnConf() {
+  public DNConf getDnConf() {
     return dnConf;
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Aug 12 21:25:49 2013
@@ -299,7 +299,8 @@ class DataXceiver extends Receiver imple
       final String clientName,
       final long blockOffset,
       final long length,
-      final boolean sendChecksum) throws IOException {
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientName;
 
     OutputStream baseStream = getOutputStream();
@@ -324,7 +325,8 @@ class DataXceiver extends Receiver imple
     try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
-            true, false, sendChecksum, datanode, clientTraceFmt);
+            true, false, sendChecksum, datanode, clientTraceFmt,
+            cachingStrategy);
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
@@ -393,7 +395,8 @@ class DataXceiver extends Receiver imple
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -452,7 +455,8 @@ class DataXceiver extends Receiver imple
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
-            clientname, srcDataNode, datanode, requestedChecksum);
+            clientname, srcDataNode, datanode, requestedChecksum,
+            cachingStrategy);
       } else {
         datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
@@ -497,7 +501,8 @@ class DataXceiver extends Receiver imple
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
+              cachingStrategy);
 
           mirrorOut.flush();
 
@@ -715,7 +720,7 @@ class DataXceiver extends Receiver imple
     try {
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, 
-          null);
+          null, CachingStrategy.newDropBehind());
 
       // set up response stream
       OutputStream baseStream = getOutputStream();
@@ -846,7 +851,8 @@ class DataXceiver extends Receiver imple
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode, remoteChecksum);
+          null, 0, 0, 0, "", null, datanode, remoteChecksum,
+          CachingStrategy.newDropBehind());
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Mon Aug 12 21:25:49 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
 
 /** 
  * This class defines a replica in a pipeline, which
@@ -150,11 +151,16 @@ public class ReplicaInPipeline extends R
    * Interrupt the writing thread and wait until it dies
    * @throws IOException the waiting is interrupted
    */
-  public void stopWriter() throws IOException {
+  public void stopWriter(long xceiverStopTimeout) throws IOException {
     if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
       writer.interrupt();
       try {
-        writer.join();
+        writer.join(xceiverStopTimeout);
+        if (writer.isAlive()) {
+          final String msg = "Join on writer thread " + writer + " timed out";
+          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer));
+          throw new IOException(msg);
+        }
       } catch (InterruptedException e) {
         throw new IOException("Waiting for writer thread is interrupted.");
       }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Aug 12 21:25:49 2013
@@ -76,7 +76,6 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
@@ -615,7 +614,7 @@ class FsDatasetImpl implements FsDataset
     if (replicaInfo.getState() == ReplicaState.RBW) {
       ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
       // kill the previous writer
-      rbw.stopWriter();
+      rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
       rbw.setWriter(Thread.currentThread());
       // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
       if (replicaLen != rbw.getBytesOnDisk() 
@@ -735,7 +734,7 @@ class FsDatasetImpl implements FsDataset
     LOG.info("Recovering " + rbw);
 
     // Stop the previous writer
-    rbw.stopWriter();
+    rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
     rbw.setWriter(Thread.currentThread());
 
     // check generation stamp
@@ -1451,13 +1450,14 @@ class FsDatasetImpl implements FsDataset
   @Override // FsDatasetSpi
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {
-    return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(),
-        volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp());
+    return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
+        rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
+        datanode.getDnConf().getXceiverStopTimeout());
   }
 
   /** static version of {@link #initReplicaRecovery(Block, long)}. */
-  static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
-      ReplicaMap map, Block block, long recoveryId) throws IOException {
+  static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
+      Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
     final ReplicaInfo replica = map.get(bpid, block.getBlockId());
     LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
         + ", replica=" + replica);
@@ -1470,7 +1470,7 @@ class FsDatasetImpl implements FsDataset
     //stop writer if there is any
     if (replica instanceof ReplicaInPipeline) {
       final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
-      rip.stopWriter();
+      rip.stopWriter(xceiverStopTimeout);
 
       //check replica bytes on disk.
       if (rip.getBytesOnDisk() < rip.getVisibleLength()) {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Mon Aug 12 21:25:49 2013
@@ -99,9 +99,19 @@ class FsVolumeList {
   }
   
   void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+    long totalStartTime = System.currentTimeMillis();
     for (FsVolumeImpl v : volumes) {
+      FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
+          " on volume " + v + "...");
+      long startTime = System.currentTimeMillis();
       v.getVolumeMap(bpid, volumeMap);
-    }
+      long timeTaken = System.currentTimeMillis() - startTime;
+      FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
+          " on volume " + v + ": " + timeTaken + "ms");
+    }
+    long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
+    FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
+        + totalTimeTaken + "ms");
   }
     
   /**
@@ -150,10 +160,47 @@ class FsVolumeList {
   }
 
 
-  void addBlockPool(String bpid, Configuration conf) throws IOException {
-    for (FsVolumeImpl v : volumes) {
-      v.addBlockPool(bpid, conf);
+  void addBlockPool(final String bpid, final Configuration conf) throws IOException {
+    long totalStartTime = System.currentTimeMillis();
+    
+    final List<IOException> exceptions = Collections.synchronizedList(
+        new ArrayList<IOException>());
+    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
+    for (final FsVolumeImpl v : volumes) {
+      Thread t = new Thread() {
+        public void run() {
+          try {
+            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
+                " on volume " + v + "...");
+            long startTime = System.currentTimeMillis();
+            v.addBlockPool(bpid, conf);
+            long timeTaken = System.currentTimeMillis() - startTime;
+            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
+                " on " + v + ": " + timeTaken + "ms");
+          } catch (IOException ioe) {
+            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
+                ". Will throw later.", ioe);
+            exceptions.add(ioe);
+          }
+        }
+      };
+      blockPoolAddingThreads.add(t);
+      t.start();
     }
+    for (Thread t : blockPoolAddingThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      throw exceptions.get(0);
+    }
+    
+    long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
+    FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
+        bpid + ": " + totalTimeTaken + "ms");
   }
   
   void removeBlockPool(String bpid) {