You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC

svn commit: r1097905 [11/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/j...

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java Fri Apr 29 18:16:32 2011
@@ -32,12 +32,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.log4j.Level;
 
 /**
@@ -62,8 +64,9 @@ public class TestFileCorruption extends 
       FileSystem fs = cluster.getFileSystem();
       util.createFiles(fs, "/srcdat");
       // Now deliberately remove the blocks
-      File data_dir = new File(System.getProperty("test.build.data"),
-                               "dfs/data/data5/current");
+      File storageDir = MiniDFSCluster.getStorageDir(2, 0);
+      String bpid = cluster.getNamesystem().getBlockPoolId();
+      File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
       assertTrue("data directory does not exist", data_dir.exists());
       File[] blocks = data_dir.listFiles();
       assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -122,12 +125,14 @@ public class TestFileCorruption extends 
       DFSTestUtil.createFile(fs, FILE_PATH, FILE_LEN, (short)2, 1L);
       
       // get the block
-      File dataDir = new File(cluster.getDataDirectory(),
-          "data1" + MiniDFSCluster.FINALIZED_DIR_NAME);
-      Block blk = getBlock(dataDir);
+      final String bpid = cluster.getNamesystem().getBlockPoolId();
+      File storageDir = MiniDFSCluster.getStorageDir(0, 0);
+      File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
+      ExtendedBlock blk = getBlock(bpid, dataDir);
       if (blk == null) {
-        blk = getBlock(new File(cluster.getDataDirectory(),
-          "dfs/data/data2" + MiniDFSCluster.FINALIZED_DIR_NAME));
+        storageDir = MiniDFSCluster.getStorageDir(0, 1);
+        dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
+        blk = getBlock(bpid, dataDir);
       }
       assertFalse(blk==null);
 
@@ -138,8 +143,10 @@ public class TestFileCorruption extends 
       DataNode dataNode = datanodes.get(2);
       
       // report corrupted block by the third datanode
+      DatanodeRegistration dnR = 
+        DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId());
       cluster.getNamesystem().markBlockAsCorrupt(blk, 
-          new DatanodeInfo(dataNode.dnRegistration ));
+          new DatanodeInfo(dnR));
       
       // open the file
       fs.open(FILE_PATH);
@@ -152,7 +159,7 @@ public class TestFileCorruption extends 
     
   }
   
-  private Block getBlock(File dataDir) {
+  private ExtendedBlock getBlock(String bpid, File dataDir) {
     assertTrue("data directory does not exist", dataDir.exists());
     File[] blocks = dataDir.listFiles();
     assertTrue("Blocks do not exist in dataDir", (blocks != null) && (blocks.length > 0));
@@ -179,6 +186,6 @@ public class TestFileCorruption extends 
         break;
       }
     }
-    return new Block(blockId, blocks[idx].length(), blockTimeStamp);
+    return new ExtendedBlock(bpid, blockId, blocks[idx].length(), blockTimeStamp);
   }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Fri Apr 29 18:16:32 2011
@@ -27,12 +27,9 @@ import java.util.EnumSet;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -40,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -799,8 +797,9 @@ public class TestFileCreation extends ju
       for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) {
         DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort);
         FSDataset dataset = (FSDataset)datanode.data;
-        Block b = dataset.getStoredBlock(locatedblock.getBlock().getBlockId());
-        File blockfile = dataset.findBlockFile(b.getBlockId());
+        ExtendedBlock blk = locatedblock.getBlock();
+        Block b = dataset.getStoredBlock(blk.getBlockPoolId(), blk.getBlockId());
+        File blockfile = dataset.findBlockFile(blk.getBlockPoolId(), b.getBlockId());
         System.out.println("blockfile=" + blockfile);
         if (blockfile != null) {
           BufferedReader in = new BufferedReader(new FileReader(blockfile));
@@ -865,10 +864,10 @@ public class TestFileCreation extends ju
       dfs = (DistributedFileSystem)cluster.getFileSystem();
 
       // create a new file.
-      final String f = DIR + "dhrubashutdown";
+      final String f = DIR + "testFsCloseAfterClusterShutdown";
       final Path fpath = new Path(f);
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
-      out.write("something_dhruba".getBytes());
+      out.write("something_test".getBytes());
       out.hflush();    // ensure that block is allocated
 
       // shutdown last datanode in pipeline.

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java Fri Apr 29 18:16:32 2011
@@ -73,8 +73,8 @@ public class TestFileStatus {
     conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
     cluster = new MiniDFSCluster.Builder(conf).build();
     fs = cluster.getFileSystem();
-    fc = FileContext.getFileContext(cluster.getURI(), conf);
-    hftpfs = cluster.getHftpFileSystem();
+    fc = FileContext.getFileContext(cluster.getURI(0), conf);
+    hftpfs = cluster.getHftpFileSystem(0);
     dfsClient = new DFSClient(NameNode.getAddress(conf), conf);
     file1 = new Path("filestatus.dat");
     writeFile(fs, file1, 1, fileSize, blockSize);
@@ -294,7 +294,7 @@ public class TestFileStatus {
       fs.setPermission(dir, new FsPermission((short)0));
       try {
         final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
-        final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+        final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
         hftp2.getContentSummary(dir);
         fail();
       } catch(IOException ioe) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Fri Apr 29 18:16:32 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.test.GenericTestUtils;
 
 /**
  * This test checks correctness of port usage by hdfs components:
@@ -103,7 +104,7 @@ public class TestHDFSServerPorts extends
       NameNode.setServiceAddress(config, THIS_HOST);      
     }
     config.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, THIS_HOST);
-    NameNode.format(config);
+    GenericTestUtils.formatNamenode(config);
 
     String[] args = new String[] {};
     // NameNode will modify config with the ports it bound to
@@ -261,7 +262,7 @@ public class TestHDFSServerPorts extends
       Configuration conf2 = new HdfsConfiguration(config);
       conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
           fileAsURI(new File(hdfsDir, "name2")).toString());
-      NameNode.format(conf2);
+      GenericTestUtils.formatNamenode(conf2);
       boolean started = canStartNameNode(conf2);
       assertFalse(started); // should fail
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java Fri Apr 29 18:16:32 2011
@@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.net.URL;
-import java.net.URLConnection;
 import java.net.HttpURLConnection;
-import java.util.Arrays;
 import java.util.Random;
 
 import junit.extensions.TestSetup;
@@ -39,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.log4j.Level;
 
@@ -54,6 +53,7 @@ public class TestHftpFileSystem extends 
   private static MiniDFSCluster cluster = null;
   private static FileSystem hdfs = null;
   private static HftpFileSystem hftpFs = null;
+  private static String blockPoolId = null;
   
   /**
    * Setup hadoop mini-cluster for test.
@@ -70,6 +70,7 @@ public class TestHftpFileSystem extends 
 
     cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
     hdfs = cluster.getFileSystem();
+    blockPoolId = cluster.getNamesystem().getBlockPoolId();
     final String hftpuri = 
       "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     hftpFs = (HftpFileSystem) new Path(hftpuri).getFileSystem(config);
@@ -119,20 +120,22 @@ public class TestHftpFileSystem extends 
     
     BlockLocation[] locations = 
         hdfs.getFileBlockLocations(TEST_FILE, 0, 10);
+    
     String locationName = locations[0].getNames()[0];
     URL u = hftpFs.getNamenodeFileURL(TEST_FILE);
     HttpURLConnection conn = (HttpURLConnection)u.openConnection();
-    conn.setFollowRedirects(true);
+    HttpURLConnection.setFollowRedirects(true);
     conn.connect();
     conn.getInputStream();
     boolean checked = false;
     // Find the datanode that has the block according to locations
     // and check that the URL was redirected to this DN's info port
     for (DataNode node : cluster.getDataNodes()) {
-      if (node.getDatanodeRegistration().getName().equals(locationName)) {
+      DatanodeRegistration dnR = 
+        DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId);
+      if (dnR.getName().equals(locationName)) {
         checked = true;
-        assertEquals(node.getDatanodeRegistration().getInfoPort(),
-                    conn.getURL().getPort());
+        assertEquals(dnR.getInfoPort(), conn.getURL().getPort());
       }
     }
     assertTrue("The test never checked that location of " + 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java Fri Apr 29 18:16:32 2011
@@ -140,24 +140,18 @@ public class TestInjectionForSimulatedSt
       //first time format
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
       cluster.waitActive();
+      String bpid = cluster.getNamesystem().getBlockPoolId();
       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
                                             cluster.getNameNodePort()),
                                             conf);
       
       writeFile(cluster.getFileSystem(), testPath, numDataNodes);
-
-      
       waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
-
-      
-      Iterable<Block>[] blocksList = cluster.getAllBlockReports();
-                    
+      Iterable<Block>[] blocksList = cluster.getAllBlockReports(bpid);
       
       cluster.shutdown();
       cluster = null;
       
-
-      
       /* Start the MiniDFSCluster with more datanodes since once a writeBlock
        * to a datanode node fails, same block can not be written to it
        * immediately. In our case some replication attempts will fail.

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Fri Apr 29 18:16:32 2011
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -34,7 +35,7 @@ public class TestLeaseRecovery extends j
   static final short REPLICATION_NUM = (short)3;
   private static final long LEASE_PERIOD = 300L;
 
-  static void checkMetaInfo(Block b, DataNode dn
+  static void checkMetaInfo(ExtendedBlock b, DataNode dn
       ) throws IOException {
     TestInterDatanodeProtocol.checkMetaInfo(b, dn);
   }
@@ -96,7 +97,7 @@ public class TestLeaseRecovery extends j
       }
       
       //verify Block Info
-      Block lastblock = locatedblock.getBlock();
+      ExtendedBlock lastblock = locatedblock.getBlock();
       DataNode.LOG.info("newblocks=" + lastblock);
       for(int i = 0; i < REPLICATION_NUM; i++) {
         checkMetaInfo(lastblock, datanodes[i]);
@@ -115,8 +116,8 @@ public class TestLeaseRecovery extends j
           dfs.dfs.getNamenode(), filestr).getBlock();
       long currentGS = lastblock.getGenerationStamp();
       for(int i = 0; i < REPLICATION_NUM; i++) {
-        updatedmetainfo[i] =
-          datanodes[i].data.getStoredBlock(lastblock.getBlockId());
+        updatedmetainfo[i] = datanodes[i].data.getStoredBlock(lastblock
+            .getBlockPoolId(), lastblock.getBlockId());
         assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
         assertEquals(oldSize, updatedmetainfo[i].getNumBytes());
         assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListFilesInFileContext.java Fri Apr 29 18:16:32 2011
@@ -64,7 +64,7 @@ public class TestListFilesInFileContext 
   @BeforeClass
   public static void testSetUp() throws Exception {
     cluster = new MiniDFSCluster.Builder(conf).build();
-    fc = FileContext.getFileContext(cluster.getConfiguration());
+    fc = FileContext.getFileContext(cluster.getConfiguration(0));
     fc.delete(TEST_DIR, true);
   }
   

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java Fri Apr 29 18:16:32 2011
@@ -62,7 +62,7 @@ public class TestListPathServlet {
     final String str = "hftp://"
         + CONF.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     hftpURI = new URI(str);
-    hftpFs = cluster.getHftpFileSystem();
+    hftpFs = cluster.getHftpFileSystem(0);
   }
 
   @AfterClass
@@ -110,7 +110,7 @@ public class TestListPathServlet {
     checkStatus("/nonexistent/a");
 
     final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
-    final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, "somegroup");
+    final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, 0, "somegroup");
     { //test file not found on hftp 
       final Path nonexistent = new Path("/nonexistent");
       try {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Fri Apr 29 18:16:32 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * The test makes sure that NameNode detects presense blocks that do not have
@@ -67,8 +68,8 @@ public class TestMissingBlocksAlert exte
 
 
       // Corrupt the block
-      String block = DFSTestUtil.getFirstBlock(dfs, corruptFile).getBlockName();
-      assertTrue(cluster.corruptReplica(block, 0));
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile);
+      assertTrue(TestDatanodeBlockScanner.corruptReplica(block, 0));
 
       // read the file so that the corrupt block is reported to NN
       FSDataInputStream in = dfs.open(corruptFile); 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java Fri Apr 29 18:16:32 2011
@@ -103,15 +103,15 @@ public class TestPipelines {
     List<LocatedBlock> lb = cluster.getNameNode().getBlockLocations(
       filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
 
+    String bpid = cluster.getNamesystem().getBlockPoolId();
     for (DataNode dn : cluster.getDataNodes()) {
-      Replica r = DataNodeAdapter.fetchReplicaInfo(
-        dn, lb.get(0).getBlock().getBlockId());
+      Replica r = DataNodeAdapter.fetchReplicaInfo(dn, bpid, lb.get(0)
+          .getBlock().getBlockId());
 
       assertTrue("Replica on DN " + dn + " shouldn't be null", r != null);
-      assertEquals(
-        "Should be RBW replica on " + dn + " after sequence of calls " +
-        "append()/write()/hflush()",
-        HdfsConstants.ReplicaState.RBW, r.getState());
+      assertEquals("Should be RBW replica on " + dn
+          + " after sequence of calls append()/write()/hflush()",
+          HdfsConstants.ReplicaState.RBW, r.getState());
     }
     ofs.close();
   }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Fri Apr 29 18:16:32 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
@@ -164,7 +165,7 @@ public class TestReplication extends Tes
     DFSTestUtil.waitReplication(fs, file1, replFactor);
   
     // Corrupt the block belonging to the created file
-    String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
 
     int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
     assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted); 
@@ -329,28 +330,23 @@ public class TestReplication extends Tes
       waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1);
 
       // get first block of the file.
-      String block = dfsClient.getNamenode().
-                       getBlockLocations(testFile, 0, Long.MAX_VALUE).
-                       get(0).getBlock().getBlockName();
+      ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(testFile,
+          0, Long.MAX_VALUE).get(0).getBlock();
       
       cluster.shutdown();
       cluster = null;
       
-      //Now mess up some of the replicas.
-      //Delete the first and corrupt the next two.
-      File baseDir = new File(System.getProperty("test.build.data"), 
-                                                 "dfs/data");
       for (int i=0; i<25; i++) {
         buffer[i] = '0';
       }
       
       int fileCount = 0;
-      for (int i=0; i<6; i++) {
-        File blockFile = new File(baseDir, "data" + (i+1) + 
-            MiniDFSCluster.FINALIZED_DIR_NAME + block);
+      // Choose 3 copies of block file - delete 1 and corrupt the remaining 2
+      for (int dnIndex=0; dnIndex<3; dnIndex++) {
+        File blockFile = MiniDFSCluster.getBlockFile(dnIndex, block);
         LOG.info("Checking for file " + blockFile);
         
-        if (blockFile.exists()) {
+        if (blockFile != null && blockFile.exists()) {
           if (fileCount == 0) {
             LOG.info("Deleting file " + blockFile);
             assertTrue(blockFile.delete());
@@ -429,7 +425,7 @@ public class TestReplication extends Tes
     DFSTestUtil.createFile(fs, fileName, fileLen, REPLICATION_FACTOR, 0);
     DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
 
-    String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
 
     // Change the length of a replica
     for (int i=0; i<cluster.getDataNodes().size(); i++) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java Fri Apr 29 18:16:32 2011
@@ -44,9 +44,11 @@ import static org.apache.hadoop.hdfs.ser
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
 
 /**
  * This class defines a number of static helper methods used by the
@@ -68,12 +70,18 @@ public class UpgradeUtilities {
   private static long namenodeStorageChecksum;
   // The namespaceId of the namenodeStorage directory
   private static int namenodeStorageNamespaceID;
+  // The clusterId of the namenodeStorage directory
+  private static String namenodeStorageClusterID;
+  // The blockpoolId of the namenodeStorage directory
+  private static String namenodeStorageBlockPoolID;
   // The fsscTime of the namenodeStorage directory
   private static long namenodeStorageFsscTime;
   // The singleton master storage directory for Datanode
   private static File datanodeStorage = new File(TEST_ROOT_DIR, "datanodeMaster");
   // A checksum of the contents in datanodeStorage directory
   private static long datanodeStorageChecksum;
+  // A checksum of the contents in blockpool storage directory
+  private static long blockPoolStorageChecksum;
 
   /**
    * Initialize the data structures used by this class.  
@@ -97,7 +105,7 @@ public class UpgradeUtilities {
       createEmptyDirs(new String[] {datanodeStorage.toString()});
       
       // format and start NameNode and start DataNode
-      NameNode.format(config); 
+      GenericTestUtils.formatNamenode(config);
       cluster =  new MiniDFSCluster.Builder(config)
                                    .numDataNodes(1)
                                    .startupOption(StartupOption.REGULAR)
@@ -105,9 +113,12 @@ public class UpgradeUtilities {
                                    .manageDataDfsDirs(false)
                                    .manageNameDfsDirs(false)
                                    .build();
+        
       NameNode namenode = cluster.getNameNode();
       namenodeStorageNamespaceID = namenode.versionRequest().getNamespaceID();
       namenodeStorageFsscTime = namenode.versionRequest().getCTime();
+      namenodeStorageClusterID = namenode.versionRequest().getClusterID();
+      namenodeStorageBlockPoolID = namenode.versionRequest().getBlockPoolID();
       
       FileSystem fs = FileSystem.get(config);
       Path baseDir = new Path("/TestUpgrade");
@@ -135,10 +146,15 @@ public class UpgradeUtilities {
       FileUtil.fullyDelete(new File(namenodeStorage,"in_use.lock"));
       FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
     }
-    namenodeStorageChecksum = checksumContents(
-                                               NAME_NODE, new File(namenodeStorage,"current"));
-    datanodeStorageChecksum = checksumContents(
-                                               DATA_NODE, new File(datanodeStorage,"current"));
+    namenodeStorageChecksum = checksumContents(NAME_NODE, 
+        new File(namenodeStorage, "current"));
+    File dnCurDir = new File(datanodeStorage, "current");
+    datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir);
+    
+    String bpid = cluster.getNamesystem(0).getBlockPoolId();
+    File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
+        "current");
+    blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir);
   }
   
   // Private helper method that writes a file to the given file system.
@@ -191,14 +207,26 @@ public class UpgradeUtilities {
   
   /**
    * Return the checksum for the singleton master storage directory
-   * of the given node type.
+   * for namenode
    */
-  public static long checksumMasterContents(NodeType nodeType) throws IOException {
-    if (nodeType == NAME_NODE) {
-      return namenodeStorageChecksum;
-    } else {
-      return datanodeStorageChecksum;
-    }
+  public static long checksumMasterNameNodeContents() {
+    return namenodeStorageChecksum;
+  }
+  
+  /**
+   * Return the checksum for the singleton master storage directory
+   * for datanode
+   */
+  public static long checksumMasterDataNodeContents() {
+    return datanodeStorageChecksum;
+  }
+  
+  /**
+   * Return the checksum for the singleton master storage directory
+   * for block pool.
+   */
+  public static long checksumMasterBlockPoolContents() {
+    return blockPoolStorageChecksum;
   }
   
   /**
@@ -249,75 +277,110 @@ public class UpgradeUtilities {
   }
   
   /**
-   * Simulate the <code>dfs.name.dir</code> or <code>dfs.data.dir</code>
-   * of a populated DFS filesystem.
-   *
-   * This method creates and populates the directory specified by
-   *  <code>parent/dirName</code>, for each parent directory.
-   * The contents of the new directories will be
-   * appropriate for the given node type.  If the directory does not
-   * exist, it will be created.  If the directory already exists, it
-   * will first be deleted.
-   *
-   * By default, a singleton master populated storage
-   * directory is created for a Namenode (contains edits, fsimage,
-   * version, and time files) and a Datanode (contains version and
-   * block files).  These directories are then
-   * copied by this method to create new storage
-   * directories of the appropriate type (Namenode or Datanode).
+   * Simulate the <code>dfs.name.dir</code> of a populated DFS filesystem.
+   * This method populates for each parent directory, <code>parent/dirName</code>
+   * with the content of namenode storage directory that comes from a singleton
+   * namenode master (that contains edits, fsimage, version and time files). 
+   * If the destination directory does not exist, it will be created.  
+   * If the directory already exists, it will first be deleted.
    *
+   * @param parents parent directory where {@code dirName} is created
+   * @param dirName directory under which storage directory is created
    * @return the array of created directories
    */
-  public static File[] createStorageDirs(NodeType nodeType, String[] parents, String dirName) throws Exception {
+  public static File[] createNameNodeStorageDirs(String[] parents,
+      String dirName) throws Exception {
     File[] retVal = new File[parents.length];
     for (int i = 0; i < parents.length; i++) {
       File newDir = new File(parents[i], dirName);
       createEmptyDirs(new String[] {newDir.toString()});
       LocalFileSystem localFS = FileSystem.getLocal(new HdfsConfiguration());
-      switch (nodeType) {
-      case NAME_NODE:
-        localFS.copyToLocalFile(new Path(namenodeStorage.toString(), "current"),
-                                new Path(newDir.toString()),
-                                false);
-        break;
-      case DATA_NODE:
-        localFS.copyToLocalFile(new Path(datanodeStorage.toString(), "current"),
-                                new Path(newDir.toString()),
-                                false);
-        break;
-      }
+      localFS.copyToLocalFile(new Path(namenodeStorage.toString(), "current"),
+                              new Path(newDir.toString()),
+                              false);
+      retVal[i] = newDir;
+    }
+    return retVal;
+  }  
+  
+  /**
+   * Simulate the <code>dfs.data.dir</code> of a populated DFS filesystem.
+   * This method populates for each parent directory, <code>parent/dirName</code>
+   * with the content of datanode storage directory that comes from a singleton
+   * datanode master (that contains version and block files). If the destination
+   * directory does not exist, it will be created.  If the directory already 
+   * exists, it will first be deleted.
+   * 
+   * @param parents parent directory where {@code dirName} is created
+   * @param dirName directory under which storage directory is created
+   * @return the array of created directories
+   */
+  public static File[] createDataNodeStorageDirs(String[] parents,
+      String dirName) throws Exception {
+    File[] retVal = new File[parents.length];
+    for (int i = 0; i < parents.length; i++) {
+      File newDir = new File(parents[i], dirName);
+      createEmptyDirs(new String[] {newDir.toString()});
+      LocalFileSystem localFS = FileSystem.getLocal(new HdfsConfiguration());
+      localFS.copyToLocalFile(new Path(datanodeStorage.toString(), "current"),
+                              new Path(newDir.toString()),
+                              false);
       retVal[i] = newDir;
     }
     return retVal;
   }
   
   /**
-   * Create a <code>version</code> file inside the specified parent
+   * Simulate the <code>dfs.data.dir</code> of a populated DFS filesystem.
+   * This method populates for each parent directory, <code>parent/dirName</code>
+   * with the content of block pool storage directory that comes from a singleton
+   * datanode master (that contains version and block files). If the destination
+   * directory does not exist, it will be created.  If the directory already 
+   * exists, it will first be deleted.
+   * 
+   * @param parents parent directory where {@code dirName} is created
+   * @param dirName directory under which storage directory is created
+   * @param bpid block pool id for which the storage directory is created.
+   * @return the array of created directories
+   */
+  public static File[] createBlockPoolStorageDirs(String[] parents,
+      String dirName, String bpid) throws Exception {
+    File[] retVal = new File[parents.length];
+    Path bpCurDir = new Path(MiniDFSCluster.getBPDir(datanodeStorage,
+        bpid, Storage.STORAGE_DIR_CURRENT));
+    for (int i = 0; i < parents.length; i++) {
+      File newDir = new File(parents[i] + "/current/" + bpid, dirName);
+      createEmptyDirs(new String[] {newDir.toString()});
+      LocalFileSystem localFS = FileSystem.getLocal(new HdfsConfiguration());
+      localFS.copyToLocalFile(bpCurDir,
+                              new Path(newDir.toString()),
+                              false);
+      retVal[i] = newDir;
+    }
+    return retVal;
+  }
+  
+  /**
+   * Create a <code>version</code> file for namenode inside the specified parent
    * directory.  If such a file already exists, it will be overwritten.
    * The given version string will be written to the file as the layout
    * version. None of the parameters may be null.
    *
-   * @param version
+   * @param parent directory where namenode VERSION file is stored
+   * @param version StorageInfo to create VERSION file from
+   * @param bpid Block pool Id
    *
    * @return the created version file
    */
-  public static File[] createVersionFile(Configuration conf, NodeType nodeType, File[] parent,
-                                         StorageInfo version) throws IOException 
-  {
+  public static File[] createNameNodeVersionFile(Configuration conf,
+      File[] parent, StorageInfo version, String bpid) throws IOException {
     Storage storage = null;
     File[] versionFiles = new File[parent.length];
     for (int i = 0; i < parent.length; i++) {
       File versionFile = new File(parent[i], "VERSION");
       FileUtil.fullyDelete(versionFile);
-      switch (nodeType) {
-      case NAME_NODE:
-        storage = new NNStorage(conf);
-        storage.setStorageInfo(version);
-        break;
-      case DATA_NODE:
-        storage = new DataStorage(version, "doNotCare");
-        break;
-      }
+      storage = new NNStorage(conf);
+      storage.setStorageInfo(version);
       StorageDirectory sd = storage.new StorageDirectory(parent[i].getParentFile());
       sd.write(versionFile);
       versionFiles[i] = versionFile;
@@ -326,6 +389,62 @@ public class UpgradeUtilities {
   }
   
   /**
+   * Create a <code>version</code> file for datanode inside the specified parent
+   * directory.  If such a file already exists, it will be overwritten.
+   * The given version string will be written to the file as the layout
+   * version. None of the parameters may be null.
+   *
+   * @param parent directory where namenode VERSION file is stored
+   * @param version StorageInfo to create VERSION file from
+   * @param bpid Block pool Id
+   */
+  public static void createDataNodeVersionFile(File[] parent,
+      StorageInfo version, String bpid) throws IOException {
+    createDataNodeVersionFile(parent, version, bpid, bpid);
+  }
+  
+  /**
+   * Create a <code>version</code> file for datanode inside the specified parent
+   * directory.  If such a file already exists, it will be overwritten.
+   * The given version string will be written to the file as the layout
+   * version. None of the parameters may be null.
+   *
+   * @param parent directory where namenode VERSION file is stored
+   * @param version StorageInfo to create VERSION file from
+   * @param bpid Block pool Id
+   * @param bpidToWrite Block pool Id to write into the version file
+   */
+  public static void createDataNodeVersionFile(File[] parent,
+      StorageInfo version, String bpid, String bpidToWrite) throws IOException {
+    DataStorage storage = null;
+    File[] versionFiles = new File[parent.length];
+    for (int i = 0; i < parent.length; i++) {
+      File versionFile = new File(parent[i], "VERSION");
+      FileUtil.fullyDelete(versionFile);
+      storage = new DataStorage(version, "doNotCare");
+      StorageDirectory sd = storage.new StorageDirectory(parent[i].getParentFile());
+      sd.write(versionFile);
+      versionFiles[i] = versionFile;
+      File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);
+      createBlockPoolVersionFile(bpDir, version, bpidToWrite);
+    }
+  }
+  
+  public static void createBlockPoolVersionFile(File bpDir,
+      StorageInfo version, String bpid) throws IOException {
+    // Create block pool version files
+    if (version.layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      File bpCurDir = new File(bpDir, Storage.STORAGE_DIR_CURRENT);
+      BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(version,
+          bpid);
+      File versionFile = new File(bpCurDir, "VERSION");
+      FileUtil.fullyDelete(versionFile);
+      StorageDirectory sd = bpStorage.new StorageDirectory(bpDir);
+      sd.write(versionFile);
+    }
+  }
+  
+  /**
    * Corrupt the specified file.  Some random bytes within the file
    * will be changed to some random values.
    *
@@ -372,6 +491,28 @@ public class UpgradeUtilities {
   }
   
   /**
+   * Return the cluster ID inherent in the currently running
+   * Namenode. 
+   */
+  public static String getCurrentClusterID(MiniDFSCluster cluster) throws IOException {
+    if (cluster != null) {
+      return cluster.getNameNode().versionRequest().getClusterID();
+    }
+    return namenodeStorageClusterID;
+  }
+  
+  /**
+   * Return the blockpool ID inherent in the currently running
+   * Namenode. 
+   */
+  public static String getCurrentBlockPoolID(MiniDFSCluster cluster) throws IOException {
+    if (cluster != null) {
+      return cluster.getNameNode().versionRequest().getBlockPoolID();
+    }
+    return namenodeStorageBlockPoolID;
+  }
+  
+  /**
    * Return the File System State Creation Timestamp (FSSCTime) inherent
    * in the currently running Namenode.  If no Namenode is running,
    * return the FSSCTime of the master Namenode storage directory.
@@ -385,5 +526,19 @@ public class UpgradeUtilities {
     }
     return namenodeStorageFsscTime;
   }
+
+  /**
+   * Create empty block pool directories
+   * @return array of block pool directories
+   */
+  public static String[] createEmptyBPDirs(String[] baseDirs, String bpid)
+      throws IOException {
+    String[] bpDirs = new String[baseDirs.length];
+    for (int i = 0; i < baseDirs.length; i++) {
+      bpDirs[i] = MiniDFSCluster.getBPDir(new File(baseDirs[i]), bpid);
+    }
+    createEmptyDirs(bpDirs);
+    return bpDirs;
+  }
 }
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Fri Apr 29 18:16:32 2011
@@ -29,8 +29,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.io.TestWritable;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -45,6 +53,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.log4j.Level;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
@@ -83,9 +92,9 @@ public class TestBlockToken {
 
   long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
   long blockTokenLifetime = 2 * 60 * 1000; // 2 mins
-  Block block1 = new Block(0L);
-  Block block2 = new Block(10L);
-  Block block3 = new Block(-108L);
+  ExtendedBlock block1 = new ExtendedBlock("0", 0L);
+  ExtendedBlock block2 = new ExtendedBlock("10", 10L);
+  ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
 
   private static class getLengthAnswer implements Answer<Long> {
     BlockTokenSecretManager sm;
@@ -101,7 +110,7 @@ public class TestBlockToken {
     public Long answer(InvocationOnMock invocation) throws IOException {
       Object args[] = invocation.getArguments();
       assertEquals(1, args.length);
-      Block block = (Block) args[0];
+      ExtendedBlock block = (ExtendedBlock) args[0];
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();
       assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size());
@@ -118,7 +127,7 @@ public class TestBlockToken {
   }
 
   private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
-      Block block, EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
+      ExtendedBlock block, EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
       throws IOException {
     Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
     BlockTokenIdentifier id = sm.createIdentifier();
@@ -203,7 +212,7 @@ public class TestBlockToken {
     id.readFields(new DataInputStream(new ByteArrayInputStream(token
         .getIdentifier())));
     doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength(
-        any(Block.class));
+        any(ExtendedBlock.class));
 
     final Server server = RPC.getServer(ClientDatanodeProtocol.class, mockDN,
         ADDRESS, 0, 5, true, conf, sm);
@@ -228,5 +237,73 @@ public class TestBlockToken {
       }
     }
   }
+  
+  /** 
+   * Test {@link BlockPoolTokenSecretManager}
+   */
+  @Test
+  public void testBlockPoolTokenSecretManager() throws Exception {
+    BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager();
+    
+    // Test BlockPoolSecretManager with upto 10 block pools
+    for (int i = 0; i < 10; i++) {
+      String bpid = Integer.toString(i);
+      BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true,
+          blockKeyUpdateInterval, blockTokenLifetime);
+      BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false,
+          blockKeyUpdateInterval, blockTokenLifetime);
+      bpMgr.addBlockPool(bpid, slaveHandler);
+      
+      
+      ExportedBlockKeys keys = masterHandler.exportKeys();
+      bpMgr.setKeys(bpid, keys);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
+      
+      // Test key updating
+      masterHandler.updateKeys();
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
+      keys = masterHandler.exportKeys();
+      bpMgr.setKeys(bpid, keys);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
+    }
+  }
+  
+  /**
+   * This test writes a file and gets the block locations without closing
+   * the file, and tests the block token in the last block. Block token is
+   * verified by ensuring it is of correct kind.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testBlockTokenInLastLocatedBlock() throws IOException,
+      InterruptedException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(1)
+        .numDataNodes(1).build();
+    cluster.waitActive();
 
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      String fileName = "/testBlockTokenInLastLocatedBlock";
+      Path filePath = new Path(fileName);
+      FSDataOutputStream out = fs.create(filePath, (short) 1);
+      out.write(new byte[1000]);
+      LocatedBlocks locatedBlocks = cluster.getNameNode().getBlockLocations(
+          fileName, 0, 1000);
+      while (locatedBlocks.getLastLocatedBlock() == null) {
+        Thread.sleep(100);
+        locatedBlocks = cluster.getNameNode().getBlockLocations(fileName, 0,
+            1000);
+      }
+      Token<BlockTokenIdentifier> token = locatedBlocks.getLastLocatedBlock()
+          .getBlockToken();
+      Assert.assertEquals(BlockTokenIdentifier.KIND_NAME, token.getKind());
+      out.close();
+    } finally {
+      cluster.shutdown();
+    }
+  } 
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Fri Apr 29 18:16:32 2011
@@ -18,33 +18,41 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeoutException;
 
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.mortbay.log.Log;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
-import junit.framework.TestCase;
 /**
  * This class tests if a balancer schedules tasks correctly.
  */
 public class TestBalancer extends TestCase {
+  private static final Log LOG = LogFactory.getLog(
+  "org.apache.hadoop.hdfs.TestReplication");
+  
   final private static long CAPACITY = 500L;
   final private static String RACK0 = "/rack0";
   final private static String RACK1 = "/rack1";
@@ -59,14 +67,13 @@ public class TestBalancer extends TestCa
   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
   static final int DEFAULT_BLOCK_SIZE = 10;
-  private Balancer balancer;
-  private Random r = new Random();
+  private static final Random r = new Random();
 
   static {
     Balancer.setBlockMoveWaitTime(1000L) ;
   }
 
-  private void initConf(Configuration conf) {
+  static void initConf(Configuration conf) {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
@@ -87,7 +94,8 @@ public class TestBalancer extends TestCa
   /* fill up a cluster with <code>numNodes</code> datanodes 
    * whose used space to be <code>size</code>
    */
-  private Block[] generateBlocks(Configuration conf, long size, short numNodes) throws IOException {
+  private ExtendedBlock[] generateBlocks(Configuration conf, long size,
+      short numNodes) throws IOException {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build();
     try {
       cluster.waitActive();
@@ -101,10 +109,11 @@ public class TestBalancer extends TestCa
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
 
       int numOfBlocks = locatedBlocks.size();
-      Block[] blocks = new Block[numOfBlocks];
+      ExtendedBlock[] blocks = new ExtendedBlock[numOfBlocks];
       for(int i=0; i<numOfBlocks; i++) {
-        Block b = locatedBlocks.get(i).getBlock();
-        blocks[i] = new Block(b.getBlockId(), b.getNumBytes(), b.getGenerationStamp());
+        ExtendedBlock b = locatedBlocks.get(i).getBlock();
+        blocks[i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b
+            .getNumBytes(), b.getGenerationStamp());
       }
 
       return blocks;
@@ -114,8 +123,8 @@ public class TestBalancer extends TestCa
   }
 
   /* Distribute all blocks according to the given distribution */
-  Block[][] distributeBlocks(Block[] blocks, short replicationFactor, 
-      final long[] distribution ) {
+  static Block[][] distributeBlocks(ExtendedBlock[] blocks,
+      short replicationFactor, final long[] distribution) {
     // make a copy
     long[] usedSpace = new long[distribution.length];
     System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
@@ -133,7 +142,7 @@ public class TestBalancer extends TestCa
           int chosenIndex = r.nextInt(usedSpace.length);
           if( usedSpace[chosenIndex]>0 ) {
             notChosen = false;
-            blockReports.get(chosenIndex).add(blocks[i]);
+            blockReports.get(chosenIndex).add(blocks[i].getLocalBlock());
             usedSpace[chosenIndex] -= blocks[i].getNumBytes();
           }
         }
@@ -146,6 +155,14 @@ public class TestBalancer extends TestCa
     return results;
   }
 
+  static long sum(long[] x) {
+    long s = 0L;
+    for(long a : x) {
+      s += a;
+    }
+    return s;
+  }
+
   /* we first start a cluster and fill the cluster up to a certain size.
    * then redistribute blocks according the required distribution.
    * Afterwards a balancer is running to balance the cluster.
@@ -158,13 +175,11 @@ public class TestBalancer extends TestCa
     }
 
     // calculate total space that need to be filled
-    long totalUsedSpace=0L;
-    for(int i=0; i<distribution.length; i++) {
-      totalUsedSpace += distribution[i];
-    }
+    final long totalUsedSpace = sum(distribution);
 
     // fill the cluster
-    Block[] blocks = generateBlocks(conf, totalUsedSpace, (short)numDatanodes);
+    ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace,
+        (short) numDatanodes);
 
     // redistribute blocks
     Block[][] blocksDN = distributeBlocks(
@@ -183,10 +198,7 @@ public class TestBalancer extends TestCa
     for(int i = 0; i < blocksDN.length; i++)
       cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
 
-    long totalCapacity = 0L;
-    for(long capacity:capacities) {
-      totalCapacity += capacity;
-    }
+    final long totalCapacity = sum(capacities);
     runBalancer(conf, totalUsedSpace, totalCapacity);
     cluster.shutdown();
   }
@@ -295,10 +307,8 @@ public class TestBalancer extends TestCa
       cluster.waitActive();
       client = DFSClient.createNamenode(conf);
 
-      long totalCapacity=0L;
-      for(long capacity:capacities) {
-        totalCapacity += capacity;
-      }
+      long totalCapacity = sum(capacities);
+      
       // fill up the cluster to be 30% full
       long totalUsedSpace = totalCapacity*3/10;
       createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
@@ -315,31 +325,18 @@ public class TestBalancer extends TestCa
     }
   }
 
-  /* Start balancer and check if the cluster is balanced after the run */
-  private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity )
-  throws Exception {
-    waitForHeartBeat(totalUsedSpace, totalCapacity);
-
-    // start rebalancing
-    balancer = new Balancer(conf);
-    balancer.run(new String[0]);
-
-    waitForHeartBeat(totalUsedSpace, totalCapacity);
-    Log.info("Rebalancing.");
-    waitForBalancer(totalUsedSpace, totalCapacity);
-  }
-
-  private void runBalancerDefaultConstructor(Configuration conf,
+  private void runBalancer(Configuration conf,
       long totalUsedSpace, long totalCapacity) throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity);
 
     // start rebalancing
-    balancer = new Balancer();
-    balancer.setConf(conf);
-    balancer.run(new String[0]);
+    final List<InetSocketAddress> namenodes =new ArrayList<InetSocketAddress>();
+    namenodes.add(NameNode.getServiceAddress(conf, true));
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+    assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
-    Log.info("Rebalancing with default ctor.");
+    LOG.info("Rebalancing with default ctor.");
     waitForBalancer(totalUsedSpace, totalCapacity);
   }
   
@@ -401,10 +398,8 @@ public class TestBalancer extends TestCa
       cluster.waitActive();
       client = DFSClient.createNamenode(conf);
 
-      long totalCapacity = 0L;
-      for (long capacity : capacities) {
-        totalCapacity += capacity;
-      }
+      long totalCapacity = sum(capacities);
+
       // fill up the cluster to be 30% full
       long totalUsedSpace = totalCapacity * 3 / 10;
       createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
@@ -415,7 +410,7 @@ public class TestBalancer extends TestCa
       totalCapacity += newCapacity;
 
       // run balancer and validate results
-      runBalancerDefaultConstructor(conf, totalUsedSpace, totalCapacity);
+      runBalancer(conf, totalUsedSpace, totalCapacity);
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java Fri Apr 29 18:16:32 2011
@@ -17,14 +17,15 @@
 */
 package org.apache.hadoop.hdfs.server.common;
 
+import static org.apache.hadoop.hdfs.protocol.FSConstants.LAYOUT_VERSION;
+
 import java.io.IOException;
+
 import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-
-import static org.apache.hadoop.hdfs.protocol.FSConstants.LAYOUT_VERSION;
-
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.TestDFSUpg
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.UpgradeObjectDatanode;
 import org.apache.hadoop.hdfs.server.namenode.UpgradeObjectNamenode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 
@@ -42,6 +44,7 @@ public class TestDistributedUpgrade exte
   private Configuration conf;
   private int testCounter = 0;
   private MiniDFSCluster cluster = null;
+  private String clusterId = "testClsterId";
     
   /**
    * Writes an INFO log message containing the parameters.
@@ -64,9 +67,10 @@ public class TestDistributedUpgrade exte
       // nn dirs set to name1 and name2
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                               .format(false)
+                                              .clusterId(clusterId)
                                               .startupOption(operation)
                                               .build(); // should fail
-      throw new AssertionError("Jakob was here. NameNode should have failed to start");
+      throw new AssertionError("NameNode should have failed to start");
     } catch (Exception expected) {
       expected = null;
       // expected
@@ -94,7 +98,7 @@ public class TestDistributedUpgrade exte
     TestDFSUpgradeFromImage testImg = new TestDFSUpgradeFromImage();
     testImg.unpackStorage();
     int numDNs = testImg.numDataNodes;
-
+    
     // register new upgrade objects (ignore all existing)
     UpgradeObjectCollection.initialize();
     UpgradeObjectCollection.registerUpgrade(new UO_Datanode1());
@@ -118,6 +122,7 @@ public class TestDistributedUpgrade exte
     // .startupOption(StartupOption.UPGRADE).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                               .format(false)
+                                              .clusterId(clusterId)
                                               .startupOption(StartupOption.UPGRADE)
                                               .build();
     cluster.shutdown();
@@ -132,6 +137,7 @@ public class TestDistributedUpgrade exte
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(numDNs)
                                 .format(false)
+                                .clusterId(clusterId)
                                 .startupOption(StartupOption.UPGRADE)
                                 .build();
     DFSAdmin dfsAdmin = new DFSAdmin();
@@ -143,6 +149,7 @@ public class TestDistributedUpgrade exte
     log("NameCluster regular startup after the upgrade", numDirs);
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(numDNs)
+                                .clusterId(clusterId)
                                 .format(false)
                                 .startupOption(StartupOption.REGULAR)
                                 .build();
@@ -174,7 +181,8 @@ class UO_Datanode extends UpgradeObjectD
 
   public void doUpgrade() throws IOException {
     this.status = (short)100;
-    getDatanode().namenode.processUpgradeCommand(
+    DatanodeProtocol nn = getNamenode();
+    nn.processUpgradeCommand(
         new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS, 
             getVersion(), getUpgradeStatus()));
   }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java Fri Apr 29 18:16:32 2011
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +68,7 @@ public class TestJspHelper {
   public void testGetUgi() throws IOException {
     conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://localhost:4321/");
     HttpServletRequest request = mock(HttpServletRequest.class);
+    ServletContext context = mock(ServletContext.class);
     String user = "TheDoctor";
     Text userText = new Text(user);
     DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
@@ -78,19 +80,44 @@ public class TestJspHelper {
         tokenString);
     when(request.getRemoteUser()).thenReturn(user);
 
+    //Test attribute in the url to be used as service in the token.
+    when(request.getParameter(JspHelper.NAMENODE_ADDRESS)).thenReturn(
+        "1.1.1.1:1111");
+
     conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
     UserGroupInformation.setConfiguration(conf);
 
-    InetSocketAddress serviceAddr = NameNode.getAddress(conf);
-    Text tokenService = new Text(serviceAddr.getAddress().getHostAddress()
-        + ":" + serviceAddr.getPort());
-
-    UserGroupInformation ugi = JspHelper.getUGI(request, conf);
+    verifyServiceInToken(context, request, "1.1.1.1:1111");
+    
+    //Test attribute name.node.address 
+    //Set the nnaddr url parameter to null.
+    when(request.getParameter(JspHelper.NAMENODE_ADDRESS)).thenReturn(null);
+    InetSocketAddress addr = new InetSocketAddress("localhost", 2222);
+    when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+        .thenReturn(addr);
+    verifyServiceInToken(context, request, addr.getAddress().getHostAddress()
+        + ":2222");
+    
+    //Test service already set in the token
+    token.setService(new Text("3.3.3.3:3333"));
+    tokenString = token.encodeToUrlString();
+    //Set the name.node.address attribute in Servlet context to null
+    when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+        .thenReturn(null);
+    when(request.getParameter(JspHelper.DELEGATION_PARAMETER_NAME)).thenReturn(
+        tokenString);
+    verifyServiceInToken(context, request, "3.3.3.3:3333");
+  }
+  
+  private void verifyServiceInToken(ServletContext context,
+      HttpServletRequest request, String expected) throws IOException {
+    UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
     Token<? extends TokenIdentifier> tokenInUgi = ugi.getTokens().iterator()
         .next();
-    Assert.assertEquals(tokenInUgi.getService(), tokenService);
+    Assert.assertEquals(tokenInUgi.getService().toString(), expected);
   }
   
+  
   @Test
   public void testDelegationTokenUrlParam() {
     conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java Fri Apr 29 18:16:32 2011
@@ -28,11 +28,13 @@ public class DataNodeAdapter {
   /**
    * Fetch a copy of ReplicaInfo from a datanode by block id
    * @param dn datanode to retrieve a replicainfo object from
+   * @param bpid Block pool Id
    * @param blkId id of the replica's block
    * @return copy of ReplicaInfo object @link{FSDataset#fetchReplicaInfo}
    */
   public static ReplicaInfo fetchReplicaInfo (final DataNode dn,
+                                              final String bpid,
                                               final long blkId) {
-    return ((FSDataset)dn.data).fetchReplicaInfo(blkId);
+    return ((FSDataset)dn.data).fetchReplicaInfo(bpid, blkId);
   }
 }