You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/04/27 01:50:20 UTC

svn commit: r1476453 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/mai...

Author: szetszwo
Date: Fri Apr 26 23:50:17 2013
New Revision: 1476453

URL: http://svn.apache.org/r1476453
Log:
Merge r1476010 through r1476452 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
      - copied unchanged from r1476452, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
      - copied unchanged from r1476452, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1476010-1476452

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Apr 26 23:50:17 2013
@@ -22,6 +22,9 @@ Trunk (Unreleased)
     Azure environments. (See breakdown of tasks below for subtasks and
     contributors)
 
+    HDFS-2576. Enhances the DistributedFileSystem's create API so that clients
+    can specify favored datanodes for a file's blocks. (ddas)
+
   IMPROVEMENTS
 
     HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@@ -262,6 +265,9 @@ Trunk (Unreleased)
     HDFS-4757. Update FSDirectory#inodeMap when replacing an INodeDirectory
     while setting quota.  (Jing Zhao via szetszwo)
 
+    HDFS-4761. When resetting FSDirectory, the inodeMap should also be reset.
+    (Jing Zhao via szetszwo)
+
   BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
 
     HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao,
@@ -457,6 +463,10 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4346. Add SequentialNumber as a base class for INodeId and
     GenerationStamp.  (szetszwo)
 
+    HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing
+    the datanode with the most recent heartbeat as the primary.  (Varun Sharma
+    via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1476010-1476452

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Apr 26 23:50:17 2013
@@ -1210,7 +1210,7 @@ public class DFSClient implements java.i
                              ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize, checksumOpt);
+        replication, blockSize, progress, buffersize, checksumOpt, null);
   }
 
   /**
@@ -1244,6 +1244,29 @@ public class DFSClient implements java.i
                              Progressable progress,
                              int buffersize,
                              ChecksumOpt checksumOpt) throws IOException {
+    return create(src, permission, flag, createParent, replication, blockSize, 
+        progress, buffersize, checksumOpt, null);
+  }
+
+  /**
+   * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
+   * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
+   * a hint to where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. HDFS could move the blocks during balancing or
+   * replication, to move the blocks from favored nodes. A value of null means
+   * no favored nodes for this create
+   */
+  public DFSOutputStream create(String src, 
+                             FsPermission permission,
+                             EnumSet<CreateFlag> flag, 
+                             boolean createParent,
+                             short replication,
+                             long blockSize,
+                             Progressable progress,
+                             int buffersize,
+                             ChecksumOpt checksumOpt,
+                             InetSocketAddress[] favoredNodes) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getFileDefault();
@@ -1252,9 +1275,18 @@ public class DFSClient implements java.i
     if(LOG.isDebugEnabled()) {
       LOG.debug(src + ": masked=" + masked);
     }
+    String[] favoredNodeStrs = null;
+    if (favoredNodes != null) {
+      favoredNodeStrs = new String[favoredNodes.length];
+      for (int i = 0; i < favoredNodes.length; i++) {
+        favoredNodeStrs[i] = 
+            favoredNodes[i].getAddress().getHostAddress() + ":" 
+                         + favoredNodes[i].getPort();
+      }
+    }
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
-        buffersize, dfsClientConf.createChecksum(checksumOpt));
+        buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
     beginFileLease(src, result);
     return result;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Apr 26 23:50:17 2013
@@ -315,6 +315,7 @@ public class DFSOutputStream extends FSO
             return key;
           }
         });
+    private String[] favoredNodes;
     volatile boolean hasError = false;
     volatile int errorIndex = -1;
     private BlockConstructionStage stage;  // block construction stage
@@ -391,7 +392,11 @@ public class DFSOutputStream extends FSO
 
       }
     }
-    
+
+    private void setFavoredNodes(String[] favoredNodes) {
+      this.favoredNodes = favoredNodes;
+    }
+
     /**
      * Initialize for data streaming
      */
@@ -1177,7 +1182,7 @@ public class DFSOutputStream extends FSO
         while (true) {
           try {
             return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-                block, excludedNodes, fileId);
+                block, excludedNodes, fileId, favoredNodes);
           } catch (RemoteException e) {
             IOException ue = 
               e.unwrapRemoteException(FileNotFoundException.class,
@@ -1318,7 +1323,7 @@ public class DFSOutputStream extends FSO
   /** Construct a new output stream for creating a file. */
   private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
-      DataChecksum checksum) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
@@ -1326,12 +1331,15 @@ public class DFSOutputStream extends FSO
         checksum.getBytesPerChecksum());
 
     streamer = new DataStreamer();
+    if (favoredNodes != null && favoredNodes.length != 0) {
+      streamer.setFavoredNodes(favoredNodes);
+    }
   }
 
   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
       short replication, long blockSize, Progressable progress, int buffersize,
-      DataChecksum checksum) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     final HdfsFileStatus stat;
     try {
       stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
@@ -1349,11 +1357,19 @@ public class DFSOutputStream extends FSO
                                      SnapshotAccessControlException.class);
     }
     final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-        flag, progress, checksum);
+        flag, progress, checksum, favoredNodes);
     out.start();
     return out;
   }
 
+  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+      short replication, long blockSize, Progressable progress, int buffersize,
+      DataChecksum checksum) throws IOException {
+    return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication,
+        blockSize, progress, buffersize, checksum, null);
+  }
+
   /** Construct a new output stream for append. */
   private DFSOutputStream(DFSClient dfsClient, String src,
       Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Apr 26 23:50:17 2013
@@ -268,6 +268,27 @@ public class DistributedFileSystem exten
             : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
         blockSize, progress, null);
   }
+
+  /**
+   * Same as  
+   * {@link #create(Path, FsPermission, boolean, int, short, long, 
+   * Progressable)} with the addition of favoredNodes that is a hint to 
+   * where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. HDFS could move the blocks during balancing or
+   * replication, to move the blocks from favored nodes. A value of null means
+   * no favored nodes for this create
+   */
+  public HdfsDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress, InetSocketAddress[] favoredNodes) throws IOException {
+    statistics.incrementWriteOps(1);
+    final DFSOutputStream out = dfs.create(getPathName(f), permission,
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE),
+        true, replication, blockSize, progress, bufferSize, null, favoredNodes);
+    return new HdfsDataOutputStream(out, statistics);
+  }
   
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Apr 26 23:50:17 2013
@@ -306,6 +306,8 @@ public interface ClientProtocol {
    * @param excludeNodes a list of nodes that should not be
    * allocated for the current block
    * @param fileId the id uniquely identifying a file
+   * @param favoredNodes the list of nodes where the client wants the blocks.
+   *          Nodes are identified by either host name or address.
    *
    * @return LocatedBlock allocated block information.
    *
@@ -320,7 +322,8 @@ public interface ClientProtocol {
    */
   @Idempotent
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, 
+      String[] favoredNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Apr 26 23:50:17 2013
@@ -382,12 +382,15 @@ public class ClientNamenodeProtocolServe
     
     try {
       List<DatanodeInfoProto> excl = req.getExcludeNodesList();
+      List<String> favor = req.getFavoredNodesList();
       LocatedBlock result = server.addBlock(
           req.getSrc(),
           req.getClientName(),
           req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
           (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
-              .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId());
+              .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
+          (favor == null || favor.size() == 0) ? null : favor
+              .toArray(new String[favor.size()]));
       return AddBlockResponseProto.newBuilder()
           .setBlock(PBHelper.convert(result)).build();
     } catch (IOException e) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Apr 26 23:50:17 2013
@@ -312,7 +312,8 @@ public class ClientNamenodeProtocolTrans
   
   @Override
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+      String[] favoredNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -322,6 +323,9 @@ public class ClientNamenodeProtocolTrans
       req.setPrevious(PBHelper.convert(previous)); 
     if (excludeNodes != null) 
       req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
+    if (favoredNodes != null) {
+      req.addAllFavoredNodes(Arrays.asList(favoredNodes));
+    }
     try {
       return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
     } catch (ServiceException e) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Fri Apr 26 23:50:17 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction 
    */
   private List<ReplicaUnderConstruction> replicas;
 
-  /** A data-node responsible for block recovery. */
+  /**
+   * Index of the primary data node doing the recovery. Useful for log
+   * messages.
+   */
   private int primaryNodeIndex = -1;
 
   /**
@@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction 
   static class ReplicaUnderConstruction extends Block {
     private DatanodeDescriptor expectedLocation;
     private ReplicaState state;
+    private boolean chosenAsPrimary;
 
     ReplicaUnderConstruction(Block block,
                              DatanodeDescriptor target,
@@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction 
       super(block);
       this.expectedLocation = target;
       this.state = state;
+      this.chosenAsPrimary = false;
     }
 
     /**
@@ -89,6 +95,13 @@ public class BlockInfoUnderConstruction 
     }
 
     /**
+     * Whether the replica was chosen for recovery.
+     */
+    boolean getChosenAsPrimary() {
+      return chosenAsPrimary;
+    }
+
+    /**
      * Set replica state.
      */
     void setState(ReplicaState s) {
@@ -96,6 +109,13 @@ public class BlockInfoUnderConstruction 
     }
 
     /**
+     * Set whether this replica was chosen for recovery.
+     */
+    void setChosenAsPrimary(boolean chosenAsPrimary) {
+      this.chosenAsPrimary = chosenAsPrimary;
+    }
+
+    /**
      * Is data-node the replica belongs to alive.
      */
     boolean isAlive() {
@@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction 
         + " BlockInfoUnderConstruction.initLeaseRecovery:"
         + " No blocks found, lease removed.");
     }
-
-    int previous = primaryNodeIndex;
-    for(int i = 1; i <= replicas.size(); i++) {
-      int j = (previous + i)%replicas.size();
-      if (replicas.get(j).isAlive()) {
-        primaryNodeIndex = j;
-        DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); 
-        primary.addBlockToBeRecovered(this);
-        NameNode.blockStateChangeLog.info("BLOCK* " + this
-          + " recovery started, primary=" + primary);
-        return;
+    boolean allLiveReplicasTriedAsPrimary = true;
+    for (int i = 0; i < replicas.size(); i++) {
+      // Check if all replicas have been tried or not.
+      if (replicas.get(i).isAlive()) {
+        allLiveReplicasTriedAsPrimary =
+            (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
+      }
+    }
+    if (allLiveReplicasTriedAsPrimary) {
+      // Just set all the replicas to be chosen whether they are alive or not.
+      for (int i = 0; i < replicas.size(); i++) {
+        replicas.get(i).setChosenAsPrimary(false);
       }
     }
+    long mostRecentLastUpdate = 0;
+    ReplicaUnderConstruction primary = null;
+    primaryNodeIndex = -1;
+    for(int i = 0; i < replicas.size(); i++) {
+      // Skip alive replicas which have been chosen for recovery.
+      if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
+        continue;
+      }
+      if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
+        primary = replicas.get(i);
+        primaryNodeIndex = i;
+        mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
+      }
+    }
+    if (primary != null) {
+      primary.getExpectedLocation().addBlockToBeRecovered(this);
+      primary.setChosenAsPrimary(true);
+      NameNode.blockStateChangeLog.info("BLOCK* " + this
+        + " recovery started, primary=" + primary);
+    }
   }
 
   void addReplicaIfNotPresent(DatanodeDescriptor dn,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Apr 26 23:50:17 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
@@ -1333,11 +1335,12 @@ public class BlockManager {
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
       final HashMap<Node, Node> excludedNodes,
-      final long blocksize) throws IOException {
-    // choose targets for the new block to be allocated.
+      final long blocksize, List<String> favoredNodes) throws IOException {
+    List<DatanodeDescriptor> favoredDatanodeDescriptors = 
+        getDatanodeDescriptors(favoredNodes);
     final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
-        numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
-        excludedNodes, blocksize);
+        numOfReplicas, client, excludedNodes, blocksize, 
+        favoredDatanodeDescriptors);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -1351,6 +1354,24 @@ public class BlockManager {
   }
 
   /**
+   * Get list of datanode descriptors for given list of nodes. Nodes are
+   * hostaddress:port or just hostaddress.
+   */
+  List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
+    List<DatanodeDescriptor> datanodeDescriptors = null;
+    if (nodes != null) {
+      datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
+      for (int i = 0; i < nodes.size(); i++) {
+        DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
+        if (node != null) {
+          datanodeDescriptors.add(node);
+        }
+      }
+    }
+    return datanodeDescriptors;
+  }
+
+  /**
    * Parse the data-nodes the block belongs to and choose one,
    * which will be the replication source.
    *

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Apr 26 23:50:17 2013
@@ -118,6 +118,25 @@ public abstract class BlockPlacementPoli
     return chooseTarget(srcBC.getName(), numOfReplicas, writer,
                         chosenNodes, false, excludedNodes, blocksize);
   }
+  
+  /**
+   * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, 
+   * HashMap, long)} with added parameter {@code favoredDatanodes}
+   * @param favoredNodes datanodes that should be favored as targets. This
+   *          is only a hint and due to cluster state, namenode may not be 
+   *          able to place the blocks on these datanodes.
+   */
+  DatanodeDescriptor[] chooseTarget(String src,
+      int numOfReplicas, DatanodeDescriptor writer,
+      HashMap<Node, Node> excludedNodes,
+      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+    // This class does not provide the functionality of placing
+    // a block in favored datanodes. The implementations of this class
+    // are expected to provide this functionality
+    return chooseTarget(src, numOfReplicas, writer, 
+        new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes, 
+        blocksize);
+  }
 
   /**
    * Verify that the block is replicated on at least minRacks different racks

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Apr 26 23:50:17 2013
@@ -125,6 +125,60 @@ public class BlockPlacementPolicyDefault
         excludedNodes, blocksize);
   }
 
+  @Override
+  DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
+      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+    try {
+      if (favoredNodes == null || favoredNodes.size() == 0) {
+        // Favored nodes not specified, fall back to regular block placement.
+        return chooseTarget(src, numOfReplicas, writer,
+            new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
+            excludedNodes, blocksize);
+      }
+
+      HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
+          new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
+
+      // Choose favored nodes
+      List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
+      boolean avoidStaleNodes = stats != null
+          && stats.isAvoidingStaleDataNodesForWrite();
+      for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+        DatanodeDescriptor favoredNode = favoredNodes.get(i);
+        // Choose a single node which is local to favoredNode.
+        // 'results' is updated within chooseLocalNode
+        DatanodeDescriptor target = chooseLocalNode(favoredNode,
+            favoriteAndExcludedNodes, blocksize, 
+            getMaxNodesPerRack(results, 
+                numOfReplicas)[1], results, avoidStaleNodes);
+        if (target == null) {
+          LOG.warn("Could not find a target for file " + src
+              + " with favored node " + favoredNode); 
+          continue;
+        }
+        favoriteAndExcludedNodes.put(target, target);
+      }
+
+      if (results.size() < numOfReplicas) {        
+        // Not enough favored nodes, choose other nodes.
+        numOfReplicas -= results.size();
+        DatanodeDescriptor[] remainingTargets = 
+            chooseTarget(src, numOfReplicas, writer, results,
+                false, favoriteAndExcludedNodes, blocksize);
+        for (int i = 0; i < remainingTargets.length; i++) {
+          results.add(remainingTargets[i]);
+        }
+      }
+      return results.toArray(new DatanodeDescriptor[results.size()]);
+    } catch (NotEnoughReplicasException nr) {
+      // Fall back to regular block placement disregarding favored nodes hint
+      return chooseTarget(src, numOfReplicas, writer, 
+          new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
+          excludedNodes, blocksize);
+    }
+  }
+
   /** This is the implementation. */
   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
                                     DatanodeDescriptor writer,
@@ -140,15 +194,9 @@ public class BlockPlacementPolicyDefault
       excludedNodes = new HashMap<Node, Node>();
     }
      
-    int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
-    if (totalNumOfReplicas > clusterSize) {
-      numOfReplicas -= (totalNumOfReplicas-clusterSize);
-      totalNumOfReplicas = clusterSize;
-    }
-      
-    int maxNodesPerRack = 
-      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+    int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
+    numOfReplicas = result[0];
+    int maxNodesPerRack = result[1];
       
     List<DatanodeDescriptor> results = 
       new ArrayList<DatanodeDescriptor>(chosenNodes);
@@ -174,6 +222,18 @@ public class BlockPlacementPolicyDefault
     return getPipeline((writer==null)?localNode:writer,
                        results.toArray(new DatanodeDescriptor[results.size()]));
   }
+
+  private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
+      int numOfReplicas) {
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+    int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+    return new int[] {numOfReplicas, maxNodesPerRack};
+  }
     
   /* choose <i>numOfReplicas</i> from all data nodes */
   private DatanodeDescriptor chooseTarget(int numOfReplicas,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Apr 26 23:50:17 2013
@@ -213,7 +213,7 @@ public class DatanodeManager {
         " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
         "It should be a positive non-zero float value, not greater than 1.0f.");
   }
-  
+
   private static long getStaleIntervalFromConf(Configuration conf,
       long heartbeatExpireInterval) {
     long staleInterval = conf.getLong(
@@ -326,6 +326,68 @@ public class DatanodeManager {
     return host2DatanodeMap.getDatanodeByHost(host);
   }
 
+  /** @return the datanode descriptor for the host. */
+  public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) {
+    return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
+  }
+
+  /**
+   * Given datanode address or host name, returns the DatanodeDescriptor for the
+   * same, or if it doesn't find the datanode, it looks for a machine local and
+   * then rack local datanode, if a rack local datanode is not possible either,
+   * it returns the DatanodeDescriptor of any random node in the cluster.
+   *
+   * @param address hostaddress:transfer address
+   * @return the best match for the given datanode
+   * @throws IOException when no datanode is found for given address
+   */
+  DatanodeDescriptor getDatanodeDescriptor(String address) {
+    DatanodeDescriptor node = null;
+    int colon = address.indexOf(":");
+    int xferPort;
+    String host = address;
+    if (colon > 0) {
+      host = address.substring(0, colon);
+      xferPort = Integer.parseInt(address.substring(colon+1));
+      node = getDatanodeByXferAddr(host, xferPort);
+    }
+    if (node == null) {
+      node = getDatanodeByHost(host);
+    }
+    if (node == null) {
+      String networkLocation = resolveNetworkLocation(host);
+
+      // If the current cluster doesn't contain the node, fallback to
+      // something machine local and then rack local.
+      List<Node> rackNodes = getNetworkTopology()
+                                   .getDatanodesInRack(networkLocation);
+      if (rackNodes != null) {
+        // Try something machine local.
+        for (Node rackNode : rackNodes) {
+          if (((DatanodeDescriptor) rackNode).getIpAddr().equals(host)) {
+            node = (DatanodeDescriptor) rackNode;
+            break;
+          }
+        }
+
+        // Try something rack local.
+        if (node == null && !rackNodes.isEmpty()) {
+          node = (DatanodeDescriptor) (rackNodes
+              .get(DFSUtil.getRandom().nextInt(rackNodes.size())));
+        }
+      }
+
+      // If we can't even choose rack local, just choose any node in the
+      // cluster.
+      if (node == null) {
+        node = (DatanodeDescriptor)getNetworkTopology()
+                                   .chooseRandom(NodeBase.ROOT);
+      }
+    }
+    return node;
+  }
+
+
   /** Get a datanode descriptor given corresponding storageID */
   DatanodeDescriptor getDatanode(final String storageID) {
     return datanodeMap.get(storageID);
@@ -455,8 +517,13 @@ public class DatanodeManager {
     }
   }
 
+  public String resolveNetworkLocation(String host) {
+    DatanodeID d = parseDNFromHostsEntry(host);
+    return resolveNetworkLocation(d);
+  }
+
   /* Resolve a node's network location */
-  private void resolveNetworkLocation (DatanodeDescriptor node) {
+  private String resolveNetworkLocation (DatanodeID node) {
     List<String> names = new ArrayList<String>(1);
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       names.add(node.getIpAddr());
@@ -474,7 +541,7 @@ public class DatanodeManager {
     } else {
       networkLocation = rName.get(0);
     }
-    node.setNetworkLocation(networkLocation);
+    return networkLocation;
   }
 
   private boolean inHostsList(DatanodeID node) {
@@ -707,7 +774,7 @@ public class DatanodeManager {
           nodeS.setDisallowed(false); // Node is in the include list
           
           // resolve network location
-          resolveNetworkLocation(nodeS);
+          nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
           getNetworkTopology().add(nodeS);
             
           // also treat the registration message as a heartbeat
@@ -739,7 +806,7 @@ public class DatanodeManager {
         = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
       boolean success = false;
       try {
-        resolveNetworkLocation(nodeDescr);
+        nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
         networktopology.add(nodeDescr);
   
         // register new datanode
@@ -875,7 +942,7 @@ public class DatanodeManager {
         (numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
             * ratioUseStaleDataNodesForWrite);
   }
-  
+
   /**
    * @return The time interval used to mark DataNodes as stale.
    */
@@ -1093,7 +1160,7 @@ public class DatanodeManager {
    * failed.  As a special case, the loopback address is also considered
    * acceptable.  This is particularly important on Windows, where 127.0.0.1 does
    * not resolve to "localhost".
-   * 
+   *
    * @param address InetAddress to check
    * @return boolean true if name resolution successful or address is loopback
    */
@@ -1127,7 +1194,7 @@ public class DatanodeManager {
           setDatanodeDead(nodeinfo);
           throw new DisallowedDatanodeException(nodeinfo);
         }
-         
+
         if (nodeinfo == null || !nodeinfo.isAlive) {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
@@ -1142,9 +1209,34 @@ public class DatanodeManager {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
           for (BlockInfoUnderConstruction b : blocks) {
-            brCommand.add(new RecoveringBlock(
-                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
-                    .getBlockRecoveryId()));
+            DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
+            // Skip stale nodes during recovery - not heart beated for some time (30s by default).
+            List<DatanodeDescriptor> recoveryLocations =
+                new ArrayList<DatanodeDescriptor>(expectedLocations.length);
+            for (int i = 0; i < expectedLocations.length; i++) {
+              if (!expectedLocations[i].isStale(this.staleInterval)) {
+                recoveryLocations.add(expectedLocations[i]);
+              }
+            }
+            // If we only get 1 replica after eliminating stale nodes, then choose all
+            // replicas for recovery and let the primary data node handle failures.
+            if (recoveryLocations.size() > 1) {
+              if (recoveryLocations.size() != expectedLocations.length) {
+                LOG.info("Skipped stale nodes for recovery : " +
+                    (expectedLocations.length - recoveryLocations.size()));
+              }
+              brCommand.add(new RecoveringBlock(
+                  new ExtendedBlock(blockPoolId, b),
+                  recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
+                  b.getBlockRecoveryId()));
+            } else {
+              // If too many replicas are stale, then choose all replicas to participate
+              // in block recovery.
+              brCommand.add(new RecoveringBlock(
+                  new ExtendedBlock(blockPoolId, b),
+                  expectedLocations,
+                  b.getBlockRecoveryId()));
+            }
           }
           return new DatanodeCommand[] { brCommand };
         }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Apr 26 23:50:17 2013
@@ -2452,6 +2452,8 @@ public class FSDirectory implements Clos
     try {
       setReady(false);
       rootDir = createRoot(getFSNamesystem());
+      inodeMap.clear();
+      addToInodeMapUnprotected(rootDir);
       nameCache.reset();
     } finally {
       writeUnlock();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Apr 26 23:50:17 2013
@@ -2228,7 +2228,8 @@ public class FSNamesystem implements Nam
    * client to "try again later".
    */
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
-      ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
+      ExtendedBlock previous, HashMap<Node, Node> excludedNodes, 
+      List<String> favoredNodes)
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -2268,8 +2269,8 @@ public class FSNamesystem implements Nam
     }
 
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
-        src, replication, clientNode, excludedNodes, blockSize);
+    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( 
+        src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
 
     // Part II.
     // Allocate a new block, add it to the INode and the BlocksMap. 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Apr 26 23:50:17 2013
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -484,7 +485,8 @@ class NameNodeRpcServer implements Namen
   
   @Override
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
+      String[] favoredNodes)
       throws IOException {
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
@@ -497,8 +499,10 @@ class NameNodeRpcServer implements Namen
         excludedNodesSet.put(node, node);
       }
     }
+    List<String> favoredNodesList = (favoredNodes == null) ? null
+        : Arrays.asList(favoredNodes);
     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
-        clientName, previous, excludedNodesSet);
+        clientName, previous, excludedNodesSet, favoredNodesList);
     if (locatedBlock != null)
       metrics.incrAddBlockOps();
     return locatedBlock;

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1476010-1476452

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Fri Apr 26 23:50:17 2013
@@ -121,6 +121,7 @@ message AddBlockRequestProto {
   optional ExtendedBlockProto previous = 3;
   repeated DatanodeInfoProto excludeNodes = 4;
   optional uint64 fileId = 5 [default = 0];  // default as a bogus id
+  repeated string favoredNodes = 6; //the set of datanodes to use for the block
 }
 
 message AddBlockResponseProto {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Apr 26 23:50:17 2013
@@ -1072,7 +1072,10 @@
     otherwise this may cause too frequent change of stale states. 
     We thus set a minimum stale interval value (the default value is 3 times 
     of heartbeat interval) and guarantee that the stale interval cannot be less
-    than the minimum value.
+    than the minimum value. A stale data node is avoided during lease/block
+    recovery. It can be conditionally avoided for reads (see
+    dfs.namenode.avoid.read.stale.datanode) and for writes (see
+    dfs.namenode.avoid.write.stale.datanode).
   </description>
 </property>
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1476010-1476452

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1476010-1476452

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1476010-1476452

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1476010-1476452

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Apr 26 23:50:17 2013
@@ -241,7 +241,7 @@ public class TestDFSClientRetries {
                          anyString(),
                          any(ExtendedBlock.class),
                          any(DatanodeInfo[].class),
-                         anyLong())).thenAnswer(answer);
+                         anyLong(), any(String[].class))).thenAnswer(answer);
     
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
@@ -390,7 +390,7 @@ public class TestDFSClientRetries {
         }
       }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
-          Mockito.anyLong());
+          Mockito.anyLong(), Mockito.<String[]> any());
 
       doAnswer(new Answer<Boolean>() {
 
@@ -432,7 +432,7 @@ public class TestDFSClientRetries {
       Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
           Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
-          Mockito.anyLong());
+          Mockito.anyLong(), Mockito.<String[]> any());
       Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
           Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock>any());

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java Fri Apr 26 23:50:17 2013
@@ -519,7 +519,7 @@ public class TestFileCreation {
 
       // add one block to the file
       LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
-          client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID);
+          client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null);
       System.out.println("testFileCreationError2: "
           + "Added block " + location.getBlock());
 
@@ -570,7 +570,7 @@ public class TestFileCreation {
       createFile(dfs, f, 3);
       try {
         cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
-            null, null, INodeId.GRANDFATHER_INODE_ID);
+            null, null, INodeId.GRANDFATHER_INODE_ID, null);
         fail();
       } catch(IOException ioe) {
         FileSystem.LOG.info("GOOD!", ioe);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Fri Apr 26 23:50:17 2013
@@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -56,14 +60,12 @@ public class TestHeartbeatHandling {
       final HeartbeatManager hm = namesystem.getBlockManager(
           ).getDatanodeManager().getHeartbeatManager();
       final String poolId = namesystem.getBlockPoolId();
-      final DatanodeRegistration nodeReg = 
+      final DatanodeRegistration nodeReg =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
-
-
       final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
-      
+
       final int REMAINING_BLOCKS = 1;
-      final int MAX_REPLICATE_LIMIT = 
+      final int MAX_REPLICATE_LIMIT =
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
       final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
       final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
@@ -83,7 +85,7 @@ public class TestHeartbeatHandling {
           assertEquals(1, cmds.length);
           assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
           assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
-          
+
           ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
           for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
             blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
@@ -122,4 +124,113 @@ public class TestHeartbeatHandling {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test if
+   * {@link FSNamesystem#handleHeartbeat}
+   * correctly selects data node targets for block recovery.
+   */
+  @Test
+  public void testHeartbeatBlockRecovery() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    try {
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final HeartbeatManager hm = namesystem.getBlockManager(
+          ).getDatanodeManager().getHeartbeatManager();
+      final String poolId = namesystem.getBlockPoolId();
+      final DatanodeRegistration nodeReg1 =
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
+      final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
+      final DatanodeRegistration nodeReg2 =
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
+      final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
+      final DatanodeRegistration nodeReg3 = 
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
+      final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
+
+      try {
+        namesystem.writeLock();
+        synchronized(hm) {
+          NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
+          NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
+          NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
+
+          // Test with all alive nodes.
+          dd1.setLastUpdate(System.currentTimeMillis());
+          dd2.setLastUpdate(System.currentTimeMillis());
+          dd3.setLastUpdate(System.currentTimeMillis());
+          BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          DatanodeCommand[] cmds =
+              NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          assertEquals(3, recoveringNodes.length);
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
+          assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
+
+          // Test with one stale node.
+          dd1.setLastUpdate(System.currentTimeMillis());
+          // More than the default stale interval of 30 seconds.
+          dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
+          dd3.setLastUpdate(System.currentTimeMillis());
+          blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          assertEquals(2, recoveringNodes.length);
+          // dd2 is skipped.
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd3);
+
+          // Test with all stale node.
+          dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
+          // More than the default stale interval of 30 seconds.
+          dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
+          dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
+          blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          // Only dd1 is included since it heart beated and hence its not stale
+          // when the list of recovery blocks is constructed.
+          assertEquals(3, recoveringNodes.length);
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
+          assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
+        }
+      } finally {
+        namesystem.writeUnlock();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Fri Apr 26 23:50:17 2013
@@ -1059,7 +1059,7 @@ public class NNThroughputBenchmark {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
         LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
-            prevBlock, null, INodeId.GRANDFATHER_INODE_ID);
+            prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java Fri Apr 26 23:50:17 2013
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy;
 import java.lang.reflect.Field;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -108,7 +109,7 @@ public class TestAddBlockRetry {
         if(count == 1) { // run second addBlock()
           LOG.info("Starting second addBlock for " + src);
           nn.addBlock(src, "clientName", null, null,
-              INodeId.GRANDFATHER_INODE_ID);
+              INodeId.GRANDFATHER_INODE_ID, null);
           LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
           assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
           lb2 = lbs.get(0);
@@ -119,7 +120,7 @@ public class TestAddBlockRetry {
       }
     }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
         Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
-        Mockito.anyLong());
+        Mockito.anyLong(), Mockito.<List<String>>any());
 
     // create file
     nn.create(src, FsPermission.getFileDefault(),
@@ -129,7 +130,7 @@ public class TestAddBlockRetry {
 
     // start first addBlock()
     LOG.info("Starting first addBlock for " + src);
-    nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID);
+    nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null);
 
     // check locations
     LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Fri Apr 26 23:50:17 2013
@@ -77,6 +77,7 @@ import org.apache.hadoop.util.ExitUtil.E
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
@@ -1061,6 +1062,10 @@ public class TestCheckpoint {
       secondary = startSecondaryNameNode(conf);
       secondary.doCheckpoint();
       
+      FSDirectory secondaryFsDir = secondary.getFSNamesystem().dir;
+      INode rootInMap = secondaryFsDir.getInode(secondaryFsDir.rootDir.getId());
+      Assert.assertSame(rootInMap, secondaryFsDir.rootDir);
+      
       fileSys.delete(tmpDir, true);
       fileSys.mkdirs(tmpDir);
       secondary.doCheckpoint();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1476453&r1=1476452&r2=1476453&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java Fri Apr 26 23:50:17 2013
@@ -521,9 +521,17 @@ public class TestPipelinesFailover {
         storedBlock instanceof BlockInfoUnderConstruction);
     BlockInfoUnderConstruction ucBlock =
       (BlockInfoUnderConstruction)storedBlock;
-    // We expect that the first indexed replica will be the one
-    // to be in charge of the synchronization / recovery protocol.
-    DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
+    // We expect that the replica with the most recent heart beat will be
+    // the one to be in charge of the synchronization / recovery protocol.
+    DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
+    DatanodeDescriptor expectedPrimary = datanodes[0];
+    long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+    for (int i = 1; i < datanodes.length; i++) {
+      if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
+        expectedPrimary = datanodes[i];
+        mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+      }
+    }
     return expectedPrimary;
   }