You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/09/18 17:12:53 UTC

svn commit: r1524444 [2/3] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/or...

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 18 15:12:52 2013
@@ -141,6 +141,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -2495,14 +2496,8 @@ public class FSNamesystem implements Nam
     }
 
     // choose targets for the new block to be allocated.
-    // TODO: chooseTarget(..) should be changed to return DatanodeStorageInfo's
-    final DatanodeDescriptor chosenDatanodes[] = getBlockManager().chooseTarget( 
+    final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget( 
         src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
-    final DatanodeStorageInfo[] targets = new DatanodeStorageInfo[chosenDatanodes.length];
-    for(int i = 0; i < targets.length; i++) {
-      final DatanodeDescriptor dd = chosenDatanodes[i];
-      targets[i] = dd.getStorageInfos().iterator().next(); 
-    }
 
     // Part II.
     // Allocate a new block, add it to the INode and the BlocksMap. 
@@ -2644,7 +2639,7 @@ public class FSNamesystem implements Nam
 
   LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
                                         long offset) throws IOException {
-    LocatedBlock lBlk = LocatedBlock.createLocatedBlock(
+    LocatedBlock lBlk = new LocatedBlock(
         getExtendedBlock(blk), locs, offset, false);
     getBlockManager().setBlockToken(
         lBlk, BlockTokenSecretManager.AccessMode.WRITE);
@@ -2653,7 +2648,8 @@ public class FSNamesystem implements Nam
 
   /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
   LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
-      final DatanodeInfo[] existings,  final Set<Node> excludes,
+      final DatanodeInfo[] existings, final String[] storageIDs,
+      final Set<Node> excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
     //check if the feature is enabled
@@ -2661,7 +2657,7 @@ public class FSNamesystem implements Nam
 
     final DatanodeDescriptor clientnode;
     final long preferredblocksize;
-    final List<DatanodeDescriptor> chosen;
+    final List<DatanodeStorageInfo> chosen;
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
@@ -2679,23 +2675,18 @@ public class FSNamesystem implements Nam
       clientnode = file.getClientNode();
       preferredblocksize = file.getPreferredBlockSize();
 
-      //find datanode descriptors
-      chosen = new ArrayList<DatanodeDescriptor>();
-      for(DatanodeInfo d : existings) {
-        final DatanodeDescriptor descriptor = blockManager.getDatanodeManager(
-            ).getDatanode(d);
-        if (descriptor != null) {
-          chosen.add(descriptor);
-        }
-      }
+      //find datanode storages
+      final DatanodeManager dm = blockManager.getDatanodeManager();
+      chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
     } finally {
       readUnlock();
     }
 
     // choose new datanodes.
-    final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
+    final DatanodeStorageInfo[] targets = blockManager.getBlockPlacementPolicy(
         ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
-        excludes, preferredblocksize);
+            // TODO: get storage type from the file
+        excludes, preferredblocksize, StorageType.DEFAULT);
     final LocatedBlock lb = new LocatedBlock(blk, targets);
     blockManager.setBlockToken(lb, AccessMode.COPY);
     return lb;
@@ -5634,9 +5625,9 @@ public class FSNamesystem implements Nam
       for (int i = 0; i < blocks.length; i++) {
         ExtendedBlock blk = blocks[i].getBlock();
         DatanodeInfo[] nodes = blocks[i].getLocations();
+        String[] storageIDs = blocks[i].getStorageIDs();
         for (int j = 0; j < nodes.length; j++) {
-          //TODO: add "storageID to LocatedBlock
-          blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], "STORAGE_ID", 
+          blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], storageIDs[j], 
               "client machine reported it");
         }
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Sep 18 15:12:52 2013
@@ -566,7 +566,8 @@ class NameNodeRpcServer implements Namen
 
   @Override // ClientProtocol
   public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
-      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final DatanodeInfo[] existings, final String[] existingStorageIDs,
+      final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -587,8 +588,8 @@ class NameNodeRpcServer implements Namen
         excludeSet.add(node);
       }
     }
-    return namesystem.getAdditionalDatanode(src, blk,
-        existings, excludeSet, numAdditionalNodes, clientName);
+    return namesystem.getAdditionalDatanode(src, blk, existings,
+        existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
   }
   /**
    * The client needs to give up on the block.

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Wed Sep 18 15:12:52 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -158,7 +160,7 @@ public class NamenodeWebHdfsMethods {
 
   static DatanodeInfo chooseDatanode(final NameNode namenode,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      final long blocksize, Configuration conf) throws IOException {
+      final long blocksize, final Configuration conf) throws IOException {
     final BlockManager bm = namenode.getNamesystem().getBlockManager();
 
     if (op == PutOpParam.Op.CREATE) {
@@ -166,11 +168,13 @@ public class NamenodeWebHdfsMethods {
       final DatanodeDescriptor clientNode = bm.getDatanodeManager(
           ).getDatanodeByHost(getRemoteAddress());
       if (clientNode != null) {
-        final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy()
+        final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
             .chooseTarget(path, 1, clientNode,
-                new ArrayList<DatanodeDescriptor>(), false, null, blocksize);
-        if (datanodes.length > 0) {
-          return datanodes[0];
+                new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
+                // TODO: get storage type from the file
+                StorageType.DEFAULT);
+        if (storages.length > 0) {
+          return storages[0].getDatanodeDescriptor();
         }
       }
     } else if (op == GetOpParam.Op.OPEN

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Wed Sep 18 15:12:52 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 
 /****************************************************
  * A BlockCommand is an instruction to a datanode 
@@ -46,9 +47,10 @@ public class BlockCommand extends Datano
    */
   public static final long NO_ACK = Long.MAX_VALUE;
   
-  String poolId;
-  Block blocks[];
-  DatanodeInfo targets[][];
+  final String poolId;
+  final Block[] blocks;
+  final DatanodeInfo[][] targets;
+  final String[][] targetStorageIDs;
 
   /**
    * Create BlockCommand for transferring blocks to another datanode
@@ -60,21 +62,26 @@ public class BlockCommand extends Datano
     this.poolId = poolId;
     blocks = new Block[blocktargetlist.size()]; 
     targets = new DatanodeInfo[blocks.length][];
+    targetStorageIDs = new String[blocks.length][];
+
     for(int i = 0; i < blocks.length; i++) {
       BlockTargetPair p = blocktargetlist.get(i);
       blocks[i] = p.block;
-      targets[i] = p.targets;
+      targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
+      targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
     }
   }
 
-  private static final DatanodeInfo[][] EMPTY_TARGET = {};
+  private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
+  private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
 
   /**
    * Create BlockCommand for the given action
    * @param blocks blocks related to the action
    */
   public BlockCommand(int action, String poolId, Block blocks[]) {
-    this(action, poolId, blocks, EMPTY_TARGET);
+    this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
+        EMPTY_TARGET_STORAGEIDS);
   }
 
   /**
@@ -82,11 +89,12 @@ public class BlockCommand extends Datano
    * @param blocks blocks related to the action
    */
   public BlockCommand(int action, String poolId, Block[] blocks,
-      DatanodeInfo[][] targets) {
+      DatanodeInfo[][] targets, String[][] targetStorageIDs) {
     super(action);
     this.poolId = poolId;
     this.blocks = blocks;
     this.targets = targets;
+    this.targetStorageIDs = targetStorageIDs;
   }
   
   public String getBlockPoolId() {
@@ -100,4 +108,8 @@ public class BlockCommand extends Datano
   public DatanodeInfo[][] getTargets() {
     return targets;
   }
+
+  public String[][] getTargetStorageIDs() {
+    return targetStorageIDs;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Wed Sep 18 15:12:52 2013
@@ -141,6 +141,7 @@ message GetAdditionalDatanodeRequestProt
   repeated DatanodeInfoProto excludes = 4;
   required uint32 numAdditionalNodes = 5;
   required string clientName = 6;
+  repeated string existingStorageIDs = 7;
 }
 
 message GetAdditionalDatanodeResponseProto {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Wed Sep 18 15:12:52 2013
@@ -105,10 +105,12 @@ message BlockCommandProto {
     INVALIDATE = 2; // Invalidate blocks
     SHUTDOWN = 3; // Shutdown the datanode
   }
+
   required Action action = 1;
   required string blockPoolId = 2;
   repeated BlockProto blocks = 3;
   repeated DatanodeInfosProto targets = 4;
+  repeated StorageIDsProto targetStorageIDs = 5;
 }
 
 /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Wed Sep 18 15:12:52 2013
@@ -122,6 +122,13 @@ enum StorageTypeProto {
 }
 
 /**
+ * A list of storage IDs. 
+ */
+message StorageIDsProto {
+  repeated string storageIDs = 1;
+}
+
+/**
  * A LocatedBlock gives information about a block and its location.
  */ 
 message LocatedBlockProto {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Sep 18 15:12:52 2013
@@ -860,9 +860,35 @@ public class DFSTestUtil {
 
   public static DatanodeStorageInfo createDatanodeStorageInfo(
       String storageID, String ip) {
+    return createDatanodeStorageInfo(storageID, ip, "defaultRack");
+  }
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) {
+    return createDatanodeStorageInfos(racks.length, racks);
+  }
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) {
+    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
+    for(int i = storages.length; i > 0; ) {
+      final String storageID = "s" + i;
+      final String ip = i + "." + i + "." + i + "." + i;
+      i--;
+      final String rack = i < racks.length? racks[i]: "defaultRack";
+      storages[i] = createDatanodeStorageInfo(storageID, ip, rack);
+    }
+    return storages;
+  }
+  public static DatanodeStorageInfo createDatanodeStorageInfo(
+      String storageID, String ip, String rack) {
+    final DatanodeStorage storage = new DatanodeStorage(storageID);
     return new DatanodeStorageInfo(
-        getDatanodeDescriptor(ip, "defaultRack"),
-        new DatanodeStorage(storageID));
+        BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage), storage);
+  }
+  public static DatanodeDescriptor[] toDatanodeDescriptor(
+      DatanodeStorageInfo[] storages) {
+    DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];
+    for(int i = 0; i < datanodes.length; i++) {
+      datanodes[i] = storages[i].getDatanodeDescriptor();
+    }
+    return datanodes;
   }
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Wed Sep 18 15:12:52 2013
@@ -539,8 +539,9 @@ public class TestPBHelper {
     dnInfos[0][0] = DFSTestUtil.getLocalDatanodeInfo();
     dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
     dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
+    String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
     BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
-        blocks, dnInfos);
+        blocks, dnInfos, storageIDs);
     BlockCommandProto bcProto = PBHelper.convert(bc);
     BlockCommand bc2 = PBHelper.convert(bcProto);
     assertEquals(bc.getAction(), bc2.getAction());

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Wed Sep 18 15:12:52 2013
@@ -227,10 +227,16 @@ public class BlockManagerTestUtil {
   
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
       String rackLocation, boolean initializeStorage) {
+    return getDatanodeDescriptor(ipAddr, rackLocation,
+        initializeStorage? new DatanodeStorage(DatanodeStorage.newStorageID()): null);
+  }
+
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, DatanodeStorage storage) {
       DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
           DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
-      if (initializeStorage) {
-        dn.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
+      if (storage != null) {
+        dn.updateStorage(storage);
       }
       return dn;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Wed Sep 18 15:12:52 2013
@@ -22,10 +22,14 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -45,7 +49,6 @@ import org.apache.hadoop.net.NetworkTopo
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import static org.mockito.Mockito.*;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -53,6 +56,7 @@ import com.google.common.collect.LinkedL
 import com.google.common.collect.Lists;
 
 public class TestBlockManager {
+  private DatanodeStorageInfo[] storages;
   private List<DatanodeDescriptor> nodes;
   private List<DatanodeDescriptor> rackA;
   private List<DatanodeDescriptor> rackB;
@@ -81,14 +85,15 @@ public class TestBlockManager {
     fsn = Mockito.mock(FSNamesystem.class);
     Mockito.doReturn(true).when(fsn).hasWriteLock();
     bm = new BlockManager(fsn, fsn, conf);
-    nodes = ImmutableList.of(
-        BlockManagerTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA", true),
-        BlockManagerTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA", true),
-        BlockManagerTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA", true),
-        BlockManagerTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackB", true),
-        BlockManagerTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackB", true),
-        BlockManagerTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackB", true)
-    );
+    final String[] racks = {
+        "/rackA",
+        "/rackA",
+        "/rackA",
+        "/rackB",
+        "/rackB",
+        "/rackB"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    nodes = Arrays.asList(DFSTestUtil.toDatanodeDescriptor(storages));
     rackA = nodes.subList(0, 3);
     rackB = nodes.subList(3, 6);
   }
@@ -125,17 +130,18 @@ public class TestBlockManager {
   }
   
   private void doBasicTest(int testIndex) {
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
 
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertEquals(2, pipeline.length);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertTrue("Destination of replication should be on the other rack. " +
         "Was: " + pipeline[1],
-        rackB.contains(pipeline[1]));
+        rackB.contains(pipeline[1].getDatanodeDescriptor()));
   }
   
 
@@ -156,21 +162,22 @@ public class TestBlockManager {
   
   private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception {
     // Block originally on A1, A2, B1
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     
     // Decommission two of the nodes (A1, A2)
     List<DatanodeDescriptor> decomNodes = startDecommission(0, 1);
     
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertEquals("Should have three targets", 3, pipeline.length);
     
     boolean foundOneOnRackA = false;
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor target = pipeline[i];
+      DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
       if (rackA.contains(target)) {
         foundOneOnRackA = true;
       }
@@ -199,22 +206,23 @@ public class TestBlockManager {
 
   private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception {
     // Block originally on A1, A2, B1
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     
     // Decommission all of the nodes
     List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 3);
     
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertEquals("Should have three targets", 4, pipeline.length);
     
     boolean foundOneOnRackA = false;
     boolean foundOneOnRackB = false;
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor target = pipeline[i];
+      DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
       if (rackA.contains(target)) {
         foundOneOnRackA = true;
       } else if (rackB.contains(target)) {
@@ -251,21 +259,22 @@ public class TestBlockManager {
   
   private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception {
     // Block originally on A1, A2, B1
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     
     // Decommission all of the nodes in rack A
     List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 2);
     
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertEquals("Should have three targets", 3, pipeline.length);
     
     boolean foundOneOnRackB = false;
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor target = pipeline[i];
+      DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
       if (rackB.contains(target)) {
         foundOneOnRackB = true;
       }
@@ -287,9 +296,9 @@ public class TestBlockManager {
     rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
     addNodes(ImmutableList.of(rackCNode));
     try {
-      DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
+      DatanodeStorageInfo[] pipeline2 = scheduleSingleReplication(blockInfo);
       assertEquals(2, pipeline2.length);
-      assertEquals(rackCNode, pipeline2[1]);
+      assertEquals(rackCNode, pipeline2[1].getDatanodeDescriptor());
     } finally {
       removeNode(rackCNode);
     }
@@ -311,15 +320,15 @@ public class TestBlockManager {
     // Originally on only nodes in rack A.
     List<DatanodeDescriptor> origNodes = rackA;
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
-    DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo pipeline[] = scheduleSingleReplication(blockInfo);
     
     assertEquals(2, pipeline.length); // single new copy
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origNodes.contains(pipeline[0].getDatanodeDescriptor()));
     assertTrue("Destination of replication should be on the other rack. " +
         "Was: " + pipeline[1],
-        rackB.contains(pipeline[1]));
+        rackB.contains(pipeline[1].getDatanodeDescriptor()));
   }
   
   @Test
@@ -354,19 +363,11 @@ public class TestBlockManager {
    * pipeline.
    */
   private void fulfillPipeline(BlockInfo blockInfo,
-      DatanodeDescriptor[] pipeline) throws IOException {
+      DatanodeStorageInfo[] pipeline) throws IOException {
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor dn = pipeline[i];
-      
-      Iterator<DatanodeStorageInfo> iterator = dn.getStorageInfos().iterator();
-      if (iterator.hasNext()) {
-        DatanodeStorageInfo storage = iterator.next();
-        bm.addBlock(dn, storage.getStorageID(), blockInfo, null);
-        blockInfo.addStorage(storage);
-      } else {
-        throw new RuntimeException("Storage info on node: " + dn.getHostName()
-            + " is invalid.");
-      }
+      DatanodeStorageInfo storage = pipeline[i];
+      bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+      blockInfo.addStorage(storage);
     }
   }
 
@@ -389,6 +390,22 @@ public class TestBlockManager {
     }
     return ret;
   }
+
+  private List<DatanodeDescriptor> getNodes(List<DatanodeStorageInfo> storages) {
+    List<DatanodeDescriptor> ret = Lists.newArrayList();
+    for (DatanodeStorageInfo s : storages) {
+      ret.add(s.getDatanodeDescriptor());
+    }
+    return ret;
+  }
+
+  private List<DatanodeStorageInfo> getStorages(int ... indexes) {
+    List<DatanodeStorageInfo> ret = Lists.newArrayList();
+    for (int idx : indexes) {
+      ret.add(storages[idx]);
+    }
+    return ret;
+  }
   
   private List<DatanodeDescriptor> startDecommission(int ... indexes) {
     List<DatanodeDescriptor> nodes = getNodes(indexes);
@@ -407,7 +424,7 @@ public class TestBlockManager {
     return blockInfo;
   }
 
-  private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
+  private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
     // list for priority 1
     List<Block> list_p1 = new ArrayList<Block>();
     list_p1.add(block);
@@ -425,27 +442,29 @@ public class TestBlockManager {
     assertTrue("replication is pending after work is computed",
         bm.pendingReplications.getNumReplicas(block) > 0);
 
-    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
+    LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
     assertEquals(1, repls.size());
-    Entry<DatanodeDescriptor, BlockTargetPair> repl =
+    Entry<DatanodeStorageInfo, BlockTargetPair> repl =
       repls.entries().iterator().next();
         
-    DatanodeDescriptor[] targets = repl.getValue().targets;
+    DatanodeStorageInfo[] targets = repl.getValue().targets;
 
-    DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
+    DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
     pipeline[0] = repl.getKey();
     System.arraycopy(targets, 0, pipeline, 1, targets.length);
 
     return pipeline;
   }
 
-  private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
-    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
+  private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
+    LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
       LinkedListMultimap.create();
     for (DatanodeDescriptor dn : nodes) {
       List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
       if (thisRepls != null) {
-        repls.putAll(dn, thisRepls);
+        for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
+          repls.putAll(storage, thisRepls);
+        }
       }
     }
     return repls;
@@ -468,7 +487,7 @@ public class TestBlockManager {
     addBlockOnNodes(blockId,origNodes.subList(0,1));
 
     List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
-    List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
+    List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
 
     assertNotNull("Chooses source node for a highest-priority replication"
         + " even if all available source nodes have reached their replication"
@@ -491,7 +510,7 @@ public class TestBlockManager {
             UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
 
     // Increase the replication count to test replication count > hard limit
-    DatanodeDescriptor targets[] = { origNodes.get(1) };
+    DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
 
     assertNull("Does not choose a source node for a highest-priority"
@@ -507,8 +526,7 @@ public class TestBlockManager {
   @Test
   public void testSafeModeIBR() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));
-    Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
-    DatanodeStorageInfo ds = i.next();
+    DatanodeStorageInfo ds = node.getStorageInfos()[0];
     node.setStorageID(ds.getStorageID());
 
     node.isAlive = true;
@@ -552,8 +570,7 @@ public class TestBlockManager {
   @Test
   public void testSafeModeIBRAfterIncremental() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));
-    Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
-    DatanodeStorageInfo ds = i.next();
+    DatanodeStorageInfo ds = node.getStorageInfos()[0];
     node.setStorageID(ds.getStorageID());
     node.isAlive = true;
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Wed Sep 18 15:12:52 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -60,9 +59,9 @@ public class TestDatanodeDescriptor {
     assertEquals(0, dd.numBlocks());
     BlockInfo blk = new BlockInfo(new Block(1L), 1);
     BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
-    Iterator<DatanodeStorageInfo> iterator = dd.getStorageInfos().iterator();
-    assertTrue(iterator.hasNext());
-    final String storageID = iterator.next().getStorageID();
+    DatanodeStorageInfo[] storages = dd.getStorageInfos();
+    assertTrue(storages.length > 0);
+    final String storageID = storages[0].getStorageID();
     // add first block
     assertTrue(dd.addBlock(storageID, blk));
     assertEquals(1, dd.numBlocks());

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Wed Sep 18 15:12:52 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Test;
 
 /**
@@ -62,6 +63,8 @@ public class TestHeartbeatHandling {
       final DatanodeRegistration nodeReg =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
       final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
+      final String storageID = DatanodeStorage.newStorageID();
+      dd.updateStorage(new DatanodeStorage(storageID));
 
       final int REMAINING_BLOCKS = 1;
       final int MAX_REPLICATE_LIMIT =
@@ -69,7 +72,7 @@ public class TestHeartbeatHandling {
       final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
       final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
       final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
-      final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
+      final DatanodeStorageInfo[] ONE_TARGET = {dd.getStorageInfo(storageID)};
 
       try {
         namesystem.writeLock();
@@ -143,12 +146,15 @@ public class TestHeartbeatHandling {
       final DatanodeRegistration nodeReg1 =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
       final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
+      dd1.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
       final DatanodeRegistration nodeReg2 =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
       final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
+      dd2.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
       final DatanodeRegistration nodeReg3 = 
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
       final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
+      dd3.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
 
       try {
         namesystem.writeLock();
@@ -162,9 +168,9 @@ public class TestHeartbeatHandling {
           dd2.setLastUpdate(System.currentTimeMillis());
           dd3.setLastUpdate(System.currentTimeMillis());
           final DatanodeStorageInfo[] storages = {
-              dd1.getStorageInfos().iterator().next(),
-              dd2.getStorageInfos().iterator().next(),
-              dd3.getStorageInfos().iterator().next()};
+              dd1.getStorageInfos()[0],
+              dd2.getStorageInfos()[0],
+              dd3.getStorageInfos()[0]};
           BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
               new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
               BlockUCState.UNDER_RECOVERY, storages);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Wed Sep 18 15:12:52 2013
@@ -43,8 +43,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.junit.Test;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This class tests the internals of PendingReplicationBlocks.java,
  * as well as how PendingReplicationBlocks acts in BlockManager
@@ -54,22 +52,7 @@ public class TestPendingReplication {
   private static final int DFS_REPLICATION_INTERVAL = 1;
   // Number of datanodes in the cluster
   private static final int DATANODE_COUNT = 5;
-  
-  private DatanodeDescriptor genDatanodeId(int seed) {
-    seed = seed % 256;
-    String ip = seed + "." + seed + "." + seed + "." + seed;
-    return DFSTestUtil.getDatanodeDescriptor(ip, null);
-  }
 
-  private DatanodeDescriptor[] genDatanodes(int number) {
-    Preconditions.checkArgument(number >= 0);
-    DatanodeDescriptor[] nodes = new DatanodeDescriptor[number];
-    for (int i = 0; i < number; i++) {
-      nodes[i] = genDatanodeId(i);
-    }
-    return nodes;
-  }
-  
   @Test
   public void testPendingReplication() {
     PendingReplicationBlocks pendingReplications;
@@ -79,9 +62,12 @@ public class TestPendingReplication {
     //
     // Add 10 blocks to pendingReplications.
     //
-    for (int i = 0; i < 10; i++) {
+    DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
+    for (int i = 0; i < storages.length; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.increment(block, genDatanodes(i));
+      DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
+      System.arraycopy(storages, 0, targets, 0, i);
+      pendingReplications.increment(block, targets);
     }
     assertEquals("Size of pendingReplications ",
                  10, pendingReplications.size());
@@ -91,16 +77,18 @@ public class TestPendingReplication {
     // remove one item and reinsert it
     //
     Block blk = new Block(8, 8, 0);
-    pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica
+    pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor(),
+        storages[7].getStorageID()); // removes one replica
     assertEquals("pendingReplications.getNumReplicas ",
                  7, pendingReplications.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
       // removes all replicas
-      pendingReplications.decrement(blk, genDatanodeId(i));
+      pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor(),
+          storages[i].getStorageID());
     }
     assertTrue(pendingReplications.size() == 9);
-    pendingReplications.increment(blk, genDatanodes(8));
+    pendingReplications.increment(blk, DFSTestUtil.createDatanodeStorageInfos(8));
     assertTrue(pendingReplications.size() == 10);
 
     //
@@ -128,7 +116,7 @@ public class TestPendingReplication {
 
     for (int i = 10; i < 15; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.increment(block, genDatanodes(i));
+      pendingReplications.increment(block, DFSTestUtil.createDatanodeStorageInfos(i));
     }
     assertTrue(pendingReplications.size() == 15);
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Wed Sep 18 15:12:52 2013
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -67,6 +70,10 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 public class TestReplicationPolicy {
+  {
+    ((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private Random random = DFSUtil.getRandom();
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 6;
@@ -75,7 +82,7 @@ public class TestReplicationPolicy {
   private static BlockPlacementPolicy replicator;
   private static final String filename = "/dummyfile.txt";
   private static DatanodeDescriptor dataNodes[];
-  private static String[] storageIDs;
+  private static DatanodeStorageInfo[] storages;
   // The interval for marking a datanode as stale,
   private static long staleInterval = 
       DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@@ -86,14 +93,15 @@ public class TestReplicationPolicy {
   @BeforeClass
   public static void setupCluster() throws Exception {
     Configuration conf = new HdfsConfiguration();
-    dataNodes = new DatanodeDescriptor[] {
-        DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
-        DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
-        DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
-        DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
-        DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
-        DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
-      };
+    final String[] racks = {
+        "/d1/r1",
+        "/d1/r1",
+        "/d1/r2",
+        "/d1/r2",
+        "/d2/r3",
+        "/d2/r3"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
 
     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
@@ -124,6 +132,13 @@ public class TestReplicationPolicy {
     }    
   }
 
+  private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+    return isOnSameRack(left, right.getDatanodeDescriptor());
+  }
+
+  private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
+    return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
+  }
   /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
@@ -139,69 +154,69 @@ public class TestReplicationPolicy {
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
 
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[0]);
+    assertEquals(storages[0], targets[0]);
     
     targets = chooseTarget(2);
     assertEquals(targets.length, 2);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[0], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3);
     assertEquals(targets.length, 3);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertEquals(storages[0], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
 
     targets = chooseTarget(4);
     assertEquals(targets.length, 4);
-    assertEquals(targets[0], dataNodes[0]);
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
-               cluster.isOnSameRack(targets[2], targets[3]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertEquals(storages[0], targets[0]);
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+               isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
     
     dataNodes[0].updateHeartbeat(
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
     return chooseTarget(numOfReplicas, dataNodes[0]);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
       DatanodeDescriptor writer) {
     return chooseTarget(numOfReplicas, writer,
-        new ArrayList<DatanodeDescriptor>());
+        new ArrayList<DatanodeStorageInfo>());
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-      List<DatanodeDescriptor> chosenNodes) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      List<DatanodeStorageInfo> chosenNodes) {
     return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-      DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
     return chooseTarget(numOfReplicas, writer, chosenNodes, null);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-      List<DatanodeDescriptor> chosenNodes, Set<Node> excludedNodes) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
     return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(
+  private static DatanodeStorageInfo[] chooseTarget(
       int numOfReplicas,
       DatanodeDescriptor writer,
-      List<DatanodeDescriptor> chosenNodes,
+      List<DatanodeStorageInfo> chosenNodes,
       Set<Node> excludedNodes) {
     return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
-        false, excludedNodes, BLOCK_SIZE);
+        false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
   }
 
   /**
@@ -215,8 +230,8 @@ public class TestReplicationPolicy {
   @Test
   public void testChooseTarget2() throws Exception { 
     Set<Node> excludedNodes;
-    DatanodeDescriptor[] targets;
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    DatanodeStorageInfo[] targets;
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     
     excludedNodes = new HashSet<Node>();
     excludedNodes.add(dataNodes[1]); 
@@ -228,49 +243,52 @@ public class TestReplicationPolicy {
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[0]);
+    assertEquals(storages[0], targets[0]);
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(2, chosenNodes, excludedNodes);
     assertEquals(targets.length, 2);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[0], targets[0]);
+
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(3, chosenNodes, excludedNodes);
     assertEquals(targets.length, 3);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertEquals(storages[0], targets[0]);
+
+    assertFalse(isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(4, chosenNodes, excludedNodes);
     assertEquals(targets.length, 4);
-    assertEquals(targets[0], dataNodes[0]);
+    assertEquals(storages[0], targets[0]);
+
     for(int i=1; i<4; i++) {
-      assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+      assertFalse(isOnSameRack(targets[0], targets[i]));
     }
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
-               cluster.isOnSameRack(targets[2], targets[3]));
-    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+               isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[1], targets[3]));
 
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
-    chosenNodes.add(dataNodes[2]);
+    chosenNodes.add(storages[2]);
     targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
-        excludedNodes, BLOCK_SIZE);
+        excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
     System.out.println("targets=" + Arrays.asList(targets));
     assertEquals(2, targets.length);
     //make sure that the chosen node is in the target.
     int i = 0;
-    for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+    for (; i < targets.length && !storages[2].equals(targets[i]); i++);
     assertTrue(i < targets.length);
   }
 
@@ -289,34 +307,34 @@ public class TestReplicationPolicy {
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
         
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[1]);
+    assertEquals(storages[1], targets[0]);
     
     targets = chooseTarget(2);
     assertEquals(targets.length, 2);
-    assertEquals(targets[0], dataNodes[1]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[1], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3);
     assertEquals(targets.length, 3);
-    assertEquals(targets[0], dataNodes[1]);
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[1], targets[0]);
+    assertTrue(isOnSameRack(targets[1], targets[2]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(4);
     assertEquals(targets.length, 4);
-    assertEquals(targets[0], dataNodes[1]);
+    assertEquals(storages[1], targets[0]);
     for(int i=1; i<4; i++) {
-      assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+      assertFalse(isOnSameRack(targets[0], targets[i]));
     }
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
-               cluster.isOnSameRack(targets[2], targets[3]));
-    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+               isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[1], targets[3]));
 
     dataNodes[0].updateHeartbeat(
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@@ -340,27 +358,27 @@ public class TestReplicationPolicy {
           (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
     }
       
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2);
     assertEquals(targets.length, 2);
-    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     for(int i=0; i<3; i++) {
-      assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+      assertFalse(isOnSameRack(targets[i], dataNodes[0]));
     }
-    assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
-               cluster.isOnSameRack(targets[1], targets[2]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertTrue(isOnSameRack(targets[0], targets[1]) ||
+               isOnSameRack(targets[1], targets[2]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
     
     for(int i=0; i<2; i++) {
       dataNodes[i].updateHeartbeat(
@@ -381,7 +399,7 @@ public class TestReplicationPolicy {
     DatanodeDescriptor writerDesc =
       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
 
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, writerDesc);
     assertEquals(targets.length, 0);
 
@@ -390,12 +408,12 @@ public class TestReplicationPolicy {
 
     targets = chooseTarget(2, writerDesc);
     assertEquals(targets.length, 2);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
 
     targets = chooseTarget(3, writerDesc);
     assertEquals(targets.length, 3);
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
   }
 
   /**
@@ -436,7 +454,7 @@ public class TestReplicationPolicy {
     
     // try to choose NUM_OF_DATANODES which is more than actually available
     // nodes.
-    DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
+    DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
     assertEquals(targets.length, NUM_OF_DATANODES - 2);
 
     final List<LoggingEvent> log = appender.getLog();
@@ -456,18 +474,30 @@ public class TestReplicationPolicy {
     }
   }
 
-  private boolean containsWithinRange(DatanodeDescriptor target,
+  private boolean containsWithinRange(DatanodeStorageInfo target,
       DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
     assert startIndex >= 0 && startIndex < nodes.length;
     assert endIndex >= startIndex && endIndex < nodes.length;
     for (int i = startIndex; i <= endIndex; i++) {
-      if (nodes[i].equals(target)) {
+      if (nodes[i].equals(target.getDatanodeDescriptor())) {
         return true;
       }
     }
     return false;
   }
   
+  private boolean containsWithinRange(DatanodeDescriptor target,
+      DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
+    assert startIndex >= 0 && startIndex < nodes.length;
+    assert endIndex >= startIndex && endIndex < nodes.length;
+    for (int i = startIndex; i <= endIndex; i++) {
+      if (nodes[i].getDatanodeDescriptor().equals(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Test
   public void testChooseTargetWithStaleNodes() throws Exception {
     // Set dataNodes[0] as stale
@@ -476,19 +506,19 @@ public class TestReplicationPolicy {
       .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
     assertTrue(namenode.getNamesystem().getBlockManager()
         .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     // We set the datanode[0] as stale, thus should choose datanode[1] since
     // datanode[1] is on the same rack with datanode[0] (writer)
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[1]);
+    assertEquals(storages[1], targets[0]);
 
     Set<Node> excludedNodes = new HashSet<Node>();
     excludedNodes.add(dataNodes[1]);
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     // reset
     dataNodes[0].setLastUpdate(Time.now());
@@ -513,7 +543,7 @@ public class TestReplicationPolicy {
     namenode.getNamesystem().getBlockManager()
       .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
 
-    DatanodeDescriptor[] targets = chooseTarget(0);
+    DatanodeStorageInfo[] targets = chooseTarget(0);
     assertEquals(targets.length, 0);
 
     // Since we have 6 datanodes total, stale nodes should
@@ -585,11 +615,12 @@ public class TestReplicationPolicy {
           .getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
       BlockPlacementPolicy replicator = miniCluster.getNameNode()
           .getNamesystem().getBlockManager().getBlockPlacementPolicy();
-      DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+      DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
+          staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
+          BLOCK_SIZE, StorageType.DEFAULT);
 
       assertEquals(targets.length, 3);
-      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      assertFalse(isOnSameRack(targets[0], staleNodeInfo));
       
       // Step 2. Set more than half of the datanodes as stale
       for (int i = 0; i < 4; i++) {
@@ -610,10 +641,11 @@ public class TestReplicationPolicy {
       assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
           .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
       // Call chooseTarget
-      targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+      targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
+          new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
+          StorageType.DEFAULT);
       assertEquals(targets.length, 3);
-      assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      assertTrue(isOnSameRack(targets[0], staleNodeInfo));
       
       // Step 3. Set 2 stale datanodes back to healthy nodes, 
       // still have 2 stale nodes
@@ -635,7 +667,7 @@ public class TestReplicationPolicy {
       // Call chooseTarget
       targets = chooseTarget(3, staleNodeInfo);
       assertEquals(targets.length, 3);
-      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      assertFalse(isOnSameRack(targets[0], staleNodeInfo));
     } finally {
       miniCluster.shutdown();
     }
@@ -650,26 +682,26 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testRereplicate1() throws Exception {
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    chosenNodes.add(dataNodes[0]);    
-    DatanodeDescriptor[] targets;
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    chosenNodes.add(storages[0]);    
+    DatanodeStorageInfo[] targets;
     
     targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3, chosenNodes);
     assertEquals(targets.length, 3);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
   }
 
   /**
@@ -681,22 +713,22 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testRereplicate2() throws Exception {
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    chosenNodes.add(dataNodes[0]);
-    chosenNodes.add(dataNodes[1]);
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    chosenNodes.add(storages[0]);
+    chosenNodes.add(storages[1]);
 
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[1], dataNodes[0]));
   }
 
   /**
@@ -708,31 +740,31 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testRereplicate3() throws Exception {
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    chosenNodes.add(dataNodes[0]);
-    chosenNodes.add(dataNodes[2]);
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    chosenNodes.add(storages[0]);
+    chosenNodes.add(storages[2]);
     
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[2]));
     
     targets = chooseTarget(1, dataNodes[2], chosenNodes);
     assertEquals(targets.length, 1);
-    assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[2]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
 
     targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2, dataNodes[2], chosenNodes);
     assertEquals(targets.length, 2);
-    assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[2]));
   }
   
   /**
@@ -1174,4 +1206,4 @@ public class TestReplicationPolicy {
     chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
     assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
   }
-}
\ No newline at end of file
+}