You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC

svn commit: r1097905 [12/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/j...

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Apr 29 18:16:32 2011
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
@@ -34,8 +35,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,13 +95,14 @@ public class SimulatedFSDataset  impleme
     SimulatedOutputStream oStream = null;
     private long bytesAcked;
     private long bytesRcvd;
-    BInfo(Block b, boolean forWriting) throws IOException {
+    BInfo(String bpid, Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
         theBlock.setNumBytes(0);
       }
-      if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may
-                                          // be more - we find out at finalize
+      if (!storage.alloc(bpid, theBlock.getNumBytes())) { 
+        // expected length - actual length may
+        // be more - we find out at finalize
         DataNode.LOG.warn("Lack of free storage on a block alloc");
         throw new IOException("Creating block, no free space available");
       }
@@ -140,7 +145,8 @@ public class SimulatedFSDataset  impleme
       }
     }
     
-    synchronized void finalizeBlock(long finalSize) throws IOException {
+    synchronized void finalizeBlock(String bpid, long finalSize)
+        throws IOException {
       if (finalized) {
         throw new IOException(
             "Finalizing a block that has already been finalized" + 
@@ -161,12 +167,12 @@ public class SimulatedFSDataset  impleme
       // adjust if necessary
       long extraLen = finalSize - theBlock.getNumBytes();
       if (extraLen > 0) {
-        if (!storage.alloc(extraLen)) {
+        if (!storage.alloc(bpid,extraLen)) {
           DataNode.LOG.warn("Lack of free storage on a block alloc");
           throw new IOException("Creating block, no free space available");
         }
       } else {
-        storage.free(-extraLen);
+        storage.free(bpid, -extraLen);
       }
       theBlock.setNumBytes(finalSize);  
 
@@ -259,12 +265,41 @@ public class SimulatedFSDataset  impleme
     }
   }
   
-  static private class SimulatedStorage {
-    private long capacity;  // in bytes
+  /**
+   * Class is used for tracking block pool storage utilization similar
+   * to {@link BlockPoolSlice}
+   */
+  private static class SimulatedBPStorage {
     private long used;    // in bytes
     
+    long getUsed() {
+      return used;
+    }
+    
+    void alloc(long amount) {
+      used += amount;
+    }
+    
+    void free(long amount) {
+      used -= amount;
+    }
+    
+    SimulatedBPStorage() {
+      used = 0;   
+    }
+  }
+  
+  /**
+   * Class used for tracking datanode level storage utilization similar
+   * to {@link FSVolumeSet}
+   */
+  private static class SimulatedStorage {
+    private Map<String, SimulatedBPStorage> map = 
+      new HashMap<String, SimulatedBPStorage>();
+    private long capacity;  // in bytes
+    
     synchronized long getFree() {
-      return capacity - used;
+      return capacity - getUsed();
     }
     
     synchronized long getCapacity() {
@@ -272,29 +307,55 @@ public class SimulatedFSDataset  impleme
     }
     
     synchronized long getUsed() {
+      long used = 0;
+      for (SimulatedBPStorage bpStorage : map.values()) {
+        used += bpStorage.getUsed();
+      }
       return used;
     }
     
-    synchronized boolean alloc(long amount) {
+    synchronized long getBlockPoolUsed(String bpid) throws IOException {
+      return getBPStorage(bpid).getUsed();
+    }
+    
+    synchronized boolean alloc(String bpid, long amount) throws IOException {
       if (getFree() >= amount) {
-        used += amount;
+        getBPStorage(bpid).alloc(amount);
         return true;
-      } else {
-        return false;    
       }
+      return false;    
     }
     
-    synchronized void free(long amount) {
-      used -= amount;
+    synchronized void free(String bpid, long amount) throws IOException {
+      getBPStorage(bpid).free(amount);
     }
     
     SimulatedStorage(long cap) {
       capacity = cap;
-      used = 0;   
+    }
+    
+    synchronized void addBlockPool(String bpid) {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage != null) {
+        return;
+      }
+      map.put(bpid, new SimulatedBPStorage());
+    }
+    
+    synchronized void removeBlockPool(String bpid) {
+      map.remove(bpid);
+    }
+    
+    private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage == null) {
+        throw new IOException("block pool " + bpid + " not found");
+      }
+      return bpStorage;
     }
   }
   
-  private HashMap<Block, BInfo> blockMap = null;
+  private Map<String, Map<Block, BInfo>> blockMap = null;
   private SimulatedStorage storage = null;
   private String storageId;
   
@@ -302,7 +363,9 @@ public class SimulatedFSDataset  impleme
     setConf(conf);
   }
   
-  private SimulatedFSDataset() { // real construction when setConf called.. Uggg
+  // Constructor used for constructing the object using reflection
+  @SuppressWarnings("unused")
+  private SimulatedFSDataset() { // real construction when setConf called..
   }
   
   public Configuration getConf() {
@@ -316,14 +379,12 @@ public class SimulatedFSDataset  impleme
     registerMBean(storageId);
     storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
-    //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + 
-    //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
-
-    blockMap = new HashMap<Block,BInfo>(); 
+    blockMap = new HashMap<String, Map<Block,BInfo>>(); 
   }
 
-  public synchronized void injectBlocks(Iterable<Block> injectBlocks)
-                                            throws IOException {
+  public synchronized void injectBlocks(String bpid,
+      Iterable<Block> injectBlocks) throws IOException {
+    ExtendedBlock blk = new ExtendedBlock();
     if (injectBlocks != null) {
       int numInjectedBlocks = 0;
       for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@@ -331,69 +392,95 @@ public class SimulatedFSDataset  impleme
         if (b == null) {
           throw new NullPointerException("Null blocks in block list");
         }
-        if (isValidBlock(b)) {
+        blk.set(bpid, b);
+        if (isValidBlock(blk)) {
           throw new IOException("Block already exists in  block list");
         }
       }
-      HashMap<Block, BInfo> oldBlockMap = blockMap;
-      blockMap = new HashMap<Block,BInfo>(
-          numInjectedBlocks + oldBlockMap.size());
-      blockMap.putAll(oldBlockMap);
+      Map<Block, BInfo> map = blockMap.get(bpid);
+      if (map == null) {
+        map = new HashMap<Block, BInfo>();
+        blockMap.put(bpid, map);
+      }
+      
       for (Block b: injectBlocks) {
-          BInfo binfo = new BInfo(b, false);
-          blockMap.put(binfo.theBlock, binfo);
+        BInfo binfo = new BInfo(bpid, b, false);
+        map.put(binfo.theBlock, binfo);
       }
     }
   }
+  
+  /** Get a map for a given block pool Id */
+  private Map<Block, BInfo> getMap(String bpid) throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map == null) {
+      throw new IOException("Non existent blockpool " + bpid);
+    }
+    return map;
+  }
 
-  @Override
-  public synchronized void finalizeBlock(Block b) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
-    binfo.finalizeBlock(b.getNumBytes());
-
+    binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes());
   }
 
-  @Override
-  public synchronized void unfinalizeBlock(Block b) throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
     if (isValidRbw(b)) {
-      blockMap.remove(b);
+      blockMap.remove(b.getLocalBlock());
     }
   }
 
   @Override
-  public synchronized BlockListAsLongs getBlockReport() {
-    Block[] blockTable = new Block[blockMap.size()];
-    int count = 0;
-    for (BInfo b : blockMap.values()) {
-      if (b.isFinalized()) {
-        blockTable[count++] = b.theBlock;
+  public synchronized BlockListAsLongs getBlockReport(String bpid) {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    Block[] blockTable = new Block[map.size()];
+    if (map != null) {
+      int count = 0;
+      for (BInfo b : map.values()) {
+        if (b.isFinalized()) {
+          blockTable[count++] = b.theBlock;
+        }
       }
-    }
-    if (count != blockTable.length) {
-      blockTable = Arrays.copyOf(blockTable, count);
+      if (count != blockTable.length) {
+        blockTable = Arrays.copyOf(blockTable, count);
+      }
+    } else {
+      blockTable = new Block[0];
     }
     return new BlockListAsLongs(
         new ArrayList<Block>(Arrays.asList(blockTable)), null);
   }
 
+  @Override // FSDatasetMBean
   public long getCapacity() throws IOException {
     return storage.getCapacity();
   }
 
+  @Override // FSDatasetMBean
   public long getDfsUsed() throws IOException {
     return storage.getUsed();
   }
 
+  @Override // FSDatasetMBean
+  public long getBlockPoolUsed(String bpid) throws IOException {
+    return storage.getBlockPoolUsed(bpid);
+  }
+  
+  @Override // FSDatasetMBean
   public long getRemaining() throws IOException {
     return storage.getFree();
   }
 
-  @Override
-  public synchronized long getLength(Block b) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized long getLength(ExtendedBlock b) throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
@@ -402,65 +489,84 @@ public class SimulatedFSDataset  impleme
 
   @Override
   @Deprecated
-  public Replica getReplica(long blockId) {
-    return blockMap.get(new Block(blockId));
+  public Replica getReplica(String bpid, long blockId) {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      return map.get(new Block(blockId));
+    }
+    return null;
   }
 
   @Override 
-  public synchronized String getReplicaString(long blockId) {
-    final Replica r = blockMap.get(new Block(blockId));
+  public synchronized String getReplicaString(String bpid, long blockId) {
+    Replica r = null;
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      r = map.get(new Block(blockId));
+    }
     return r == null? "null": r.toString();
   }
 
-  @Override
-  public Block getStoredBlock(long blkid) throws IOException {
-    Block b = new Block(blkid);
-    BInfo binfo = blockMap.get(b);
-    if (binfo == null) {
-      return null;
+  @Override // FSDatasetInterface
+  public Block getStoredBlock(String bpid, long blkid) throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      BInfo binfo = map.get(new Block(blkid));
+      if (binfo == null) {
+        return null;
+      }
+      return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
     }
-    b.setGenerationStamp(binfo.getGenerationStamp());
-    b.setNumBytes(binfo.getNumBytes());
-    return b;
+    return null;
   }
 
-  @Override
-  public synchronized void invalidate(Block[] invalidBlks) throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized void invalidate(String bpid, Block[] invalidBlks)
+      throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
       return;
     }
+    final Map<Block, BInfo> map = getMap(bpid);
     for (Block b: invalidBlks) {
       if (b == null) {
         continue;
       }
-      BInfo binfo = blockMap.get(b);
+      BInfo binfo = map.get(b);
       if (binfo == null) {
         error = true;
         DataNode.LOG.warn("Invalidate: Missing block");
         continue;
       }
-      storage.free(binfo.getNumBytes());
+      storage.free(bpid, binfo.getNumBytes());
       blockMap.remove(b);
     }
-      if (error) {
-          throw new IOException("Invalidate: Missing blocks.");
-      }
+    if (error) {
+      throw new IOException("Invalidate: Missing blocks.");
+    }
   }
 
-  @Override
-  public synchronized boolean isValidBlock(Block b) {
-    // return (blockMap.containsKey(b));
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized boolean isValidBlock(ExtendedBlock b) {
+    final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
+    if (map == null) {
+      return false;
+    }
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       return false;
     }
     return binfo.isFinalized();
   }
 
+  /* check if a block is created but not finalized */
   @Override
-  public synchronized boolean isValidRbw(Block b) {
-    BInfo binfo = blockMap.get(b);
+  public synchronized boolean isValidRbw(ExtendedBlock b) {
+    final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
+    if (map == null) {
+      return false;
+    }
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       return false;
     }
@@ -472,10 +578,11 @@ public class SimulatedFSDataset  impleme
     return getStorageInfo();
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface append(Block b,
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null || !binfo.isFinalized()) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -484,10 +591,11 @@ public class SimulatedFSDataset  impleme
     return binfo;
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -495,32 +603,34 @@ public class SimulatedFSDataset  impleme
     if (binfo.isFinalized()) {
       binfo.unfinalizeBlock();
     }
-    blockMap.remove(b);
+    map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
-  @Override
-  public void recoverClose(Block b, long newGS,
-      long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+      throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
     }
     if (!binfo.isFinalized()) {
-      binfo.finalizeBlock(binfo.getNumBytes());
+      binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes());
     }
-    blockMap.remove(b);
+    map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
   }
   
-  @Override
-  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
       long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if ( binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " does not exist, and cannot be appended to.");
@@ -529,20 +639,20 @@ public class SimulatedFSDataset  impleme
       throw new ReplicaAlreadyExistsException("Block " + b
           + " is valid, and cannot be written to.");
     }
-    blockMap.remove(b);
+    map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface createRbw(Block b) 
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) 
   throws IOException {
     return createTemporary(b);
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
       throws IOException {
     if (isValidBlock(b)) {
           throw new ReplicaAlreadyExistsException("Block " + b + 
@@ -552,35 +662,36 @@ public class SimulatedFSDataset  impleme
         throw new ReplicaAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
-    BInfo binfo = new BInfo(b, true);
-    blockMap.put(binfo.theBlock, binfo);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
-  @Override
-  public synchronized InputStream getBlockInputStream(Block b)
-                                            throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+      throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
     
-    //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
     return binfo.getIStream();
   }
   
-  @Override
-  public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
-                              throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+      long seekOffset) throws IOException {
     InputStream result = getBlockInputStream(b);
     result.skip(seekOffset);
     return result;
   }
 
   /** Not supported */
-  @Override
-  public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
-      ) throws IOException {
+  @Override // FSDatasetInterface
+  public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
+      long ckoff) throws IOException {
     throw new IOException("Not supported");
   }
 
@@ -591,9 +702,10 @@ public class SimulatedFSDataset  impleme
    * @throws IOException - block does not exist or problems accessing
    *  the meta file
    */
-  private synchronized InputStream getMetaDataInStream(Block b)
+  private synchronized InputStream getMetaDataInStream(ExtendedBlock b)
                                               throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -604,9 +716,11 @@ public class SimulatedFSDataset  impleme
     return binfo.getMetaIStream();
   }
  
-  @Override
-  public synchronized long getMetaDataLength(Block b) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized long getMetaDataLength(ExtendedBlock b)
+      throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -617,16 +731,15 @@ public class SimulatedFSDataset  impleme
     return binfo.getMetaIStream().getLength();
   }
   
-  @Override
-  public MetaDataInputStream getMetaDataInputStream(Block b)
-  throws IOException {
-
-       return new MetaDataInputStream(getMetaDataInStream(b),
-                                                getMetaDataLength(b));
+  @Override // FSDatasetInterface
+  public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
+      throws IOException {
+     return new MetaDataInputStream(getMetaDataInStream(b), 
+                                    getMetaDataLength(b));
   }
 
-  @Override
-  public synchronized boolean metaFileExists(Block b) throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized boolean metaFileExists(ExtendedBlock b) throws IOException {
     if (!isValidBlock(b)) {
           throw new IOException("Block " + b +
               " is valid, and cannot be written to.");
@@ -638,8 +751,8 @@ public class SimulatedFSDataset  impleme
     // nothing to check for simulated data set
   }
 
-  @Override
-  public synchronized void adjustCrcChannelPosition(Block b,
+  @Override // FSDatasetInterface
+  public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
                                               BlockWriteStreams stream, 
                                               int checksumSize)
                                               throws IOException {
@@ -812,8 +925,9 @@ public class SimulatedFSDataset  impleme
   @Override
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
   throws IOException {
-    Block b = rBlock.getBlock();
-    BInfo binfo = blockMap.get(b);
+    ExtendedBlock b = rBlock.getBlock();
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -824,22 +938,44 @@ public class SimulatedFSDataset  impleme
   }
 
   @Override // FSDatasetInterface
-  public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+  public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
                                         long recoveryId,
                                         long newlength) throws IOException {
     return new FinalizedReplica(
         oldBlock.getBlockId(), newlength, recoveryId, null, null);
   }
 
-  @Override
-  public long getReplicaVisibleLength(Block block) throws IOException {
+  @Override // FSDatasetInterface
+  public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
     return block.getNumBytes();
   }
 
+  @Override // FSDatasetInterface
+  public void addBlockPool(String bpid, Configuration conf) {
+    Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+    blockMap.put(bpid, map);
+    storage.addBlockPool(bpid);
+  }
+  
+  @Override // FSDatasetInterface
+  public void shutdownBlockPool(String bpid) {
+    blockMap.remove(bpid);
+    storage.removeBlockPool(bpid);
+  }
+  
+  @Override // FSDatasetInterface
+  public void deleteBlockPool(String bpid, boolean force) {
+     return;
+  }
+
   @Override
-  public ReplicaInPipelineInterface convertTemporaryToRbw(Block temporary)
+  public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
       throws IOException {
-    final BInfo r = blockMap.get(temporary);
+    final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
+    if (map == null) {
+      throw new IOException("Block pool not found, temporary=" + temporary);
+    }
+    final BInfo r = map.get(temporary.getLocalBlock());
     if (r == null) {
       throw new IOException("Block not found, temporary=" + temporary);
     } else if (r.isFinalized()) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Fri Apr 29 18:16:32 2011
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.REPLACE_BLOCK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*;
 
 import java.io.DataInputStream;
@@ -41,9 +40,9 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 /**
  * This class tests if block replacement request to data nodes work correctly.
@@ -120,7 +118,7 @@ public class TestBlockReplacement extend
       LocatedBlock block = locatedBlocks.get(0);
       DatanodeInfo[]  oldNodes = block.getLocations();
       assertEquals(oldNodes.length, 3);
-      Block b = block.getBlock();
+      ExtendedBlock b = block.getBlock();
       
       // add a new datanode to the cluster
       cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
@@ -161,11 +159,11 @@ public class TestBlockReplacement extend
       // start to replace the block
       // case 1: proxySource does not contain the block
       LOG.info("Testcase 1: Proxy " + newNode.getName() 
-          + " does not contain the block " + b.getBlockName() );
+           + " does not contain the block " + b);
       assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
       // case 2: destination contains the block
       LOG.info("Testcase 2: Destination " + proxies.get(1).getName() 
-          + " contains the block " + b.getBlockName() );
+          + " contains the block " + b);
       assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
       // case 3: correct case
       LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" + 
@@ -224,7 +222,7 @@ public class TestBlockReplacement extend
    * 
    * Return true if a block is successfully copied; otherwise false.
    */
-  private boolean replaceBlock( Block block, DatanodeInfo source,
+  private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
     Socket sock = new Socket();
     sock.connect(NetUtils.createSocketAddr(
@@ -232,13 +230,8 @@ public class TestBlockReplacement extend
     sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    REPLACE_BLOCK.write(out);
-    out.writeLong(block.getBlockId());
-    out.writeLong(block.getGenerationStamp());
-    Text.writeString(out, source.getStorageID());
-    sourceProxy.write(out);
-    BlockTokenSecretManager.DUMMY_TOKEN.write(out);
+    DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+        .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Fri Apr 29 18:16:32 2011
@@ -28,12 +28,14 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -67,6 +69,7 @@ public class TestBlockReport {
   static final int BLOCK_SIZE = 1024;
   static final int NUM_BLOCKS = 10;
   static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
+  static String bpid;
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
@@ -85,6 +88,7 @@ public class TestBlockReport {
     REPL_FACTOR = 1; //Reset if case a test has modified the value
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
     fs = (DistributedFileSystem) cluster.getFileSystem();
+    bpid = cluster.getNamesystem().getBlockPoolId();
   }
 
   @After
@@ -130,8 +134,11 @@ public class TestBlockReport {
             b.getNumBytes());
       }
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N0).dnRegistration,
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     List<LocatedBlock> blocksAfterReport =
@@ -143,7 +150,7 @@ public class TestBlockReport {
     }
 
     for (int i = 0; i < blocksAfterReport.size(); i++) {
-      Block b = blocksAfterReport.get(i).getBlock();
+      ExtendedBlock b = blocksAfterReport.get(i).getBlock();
       assertEquals("Length of " + i + "th block is incorrect",
         oldLengths[i], b.getNumBytes());
     }
@@ -171,7 +178,7 @@ public class TestBlockReport {
     File dataDir = new File(cluster.getDataDirectory());
     assertTrue(dataDir.isDirectory());
 
-    List<Block> blocks2Remove = new ArrayList<Block>();
+    List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
     List<Integer> removedIndex = new ArrayList<Integer>();
     List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
       filePath.toString(), FILE_START,
@@ -192,7 +199,7 @@ public class TestBlockReport {
       LOG.debug("Number of blocks allocated " + lBlocks.size());
     }
 
-    for (Block b : blocks2Remove) {
+    for (ExtendedBlock b : blocks2Remove) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Removing the block " + b.getBlockName());
       }
@@ -206,8 +213,11 @@ public class TestBlockReport {
 
     waitTil(DN_RESCAN_EXTRA_WAIT);
 
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N0).dnRegistration,
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     cluster.getNamesystem().computeDatanodeWork();
@@ -241,9 +251,12 @@ public class TestBlockReport {
     blocks.get(0).setGenerationStamp(rand.nextLong());
     // This new block is unknown to NN and will be mark for deletion.
     blocks.add(new Block());
-    DatanodeCommand dnCmd =
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N0).dnRegistration,
+    
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Got the command: " + dnCmd);
@@ -291,9 +304,12 @@ public class TestBlockReport {
     ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
     startDNandWait(filePath, true);
 
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of PendingReplication Blocks",
       0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -327,8 +343,7 @@ public class TestBlockReport {
     int randIndex = rand.nextInt(blocks.size());
     // Get a block and screw its GS
     Block corruptedBlock = blocks.get(randIndex);
-    String secondNode = cluster.getDataNodes().get(DN_N1).
-      getDatanodeRegistration().getStorageID();
+    String secondNode = cluster.getDataNodes().get(DN_N1).getStorageId();
     if(LOG.isDebugEnabled()) {
       LOG.debug("Working with " + secondNode);
       LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
@@ -338,9 +353,12 @@ public class TestBlockReport {
       LOG.debug("BlockGS after " + blocks.get(randIndex).getGenerationStamp());
       LOG.debug("Done corrupting GS of " + corruptedBlock.getBlockName());
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of Corrupted blocks",
       1, cluster.getNamesystem().getCorruptReplicaBlocks() +
@@ -360,9 +378,9 @@ public class TestBlockReport {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+    
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
 
     assertEquals("Wrong number of Corrupted blocks",
@@ -406,10 +424,13 @@ public class TestBlockReport {
       bc.start();
 
       waitForTempReplica(bl, DN_N1);
-
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N1).dnRegistration,
-        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+      
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      cluster.getNameNode().blockReport(dnR, poolId,
+          new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
         blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -450,9 +471,12 @@ public class TestBlockReport {
 
       waitForTempReplica(bl, DN_N1);
                                                 
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N1).dnRegistration,
-        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      cluster.getNameNode().blockReport(dnR, poolId,
+          new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
         2, cluster.getNamesystem().getPendingReplicationBlocks());
@@ -465,7 +489,7 @@ public class TestBlockReport {
     }
   }
 
-  private void waitForTempReplica(Block bl, int DN_N1) {
+  private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
     final boolean tooLongWait = false;
     final int TIMEOUT = 40000;
     
@@ -478,16 +502,18 @@ public class TestBlockReport {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
     }
+    cluster.waitActive();
+    
     // Look about specified DN for the replica of the block from 1st DN
-    Replica r;
-    r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-      fetchReplicaInfo(bl.getBlockId());
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
+      fetchReplicaInfo(bpid, bl.getBlockId());
     long start = System.currentTimeMillis();
     int count = 0;
     while (r == null) {
       waitTil(5);
       r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-        fetchReplicaInfo(bl.getBlockId());
+        fetchReplicaInfo(bpid, bl.getBlockId());
       long waiting_period = System.currentTimeMillis() - start;
       if (count++ % 100 == 0)
         if(LOG.isDebugEnabled()) {
@@ -548,8 +574,8 @@ public class TestBlockReport {
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("New datanode "
-          + cluster.getDataNodes().get(datanodes.size() - 1)
-          .getDatanodeRegistration() + " has been started");
+          + cluster.getDataNodes().get(datanodes.size() - 1).getMachineName() 
+          + " has been started");
     }
     if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
   }
@@ -593,7 +619,7 @@ public class TestBlockReport {
         }
         continue;
       }
-      newList.add(new Block(locatedBlks.get(i).getBlock()));
+      newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock()));
     }
     return newList;
   }
@@ -685,7 +711,8 @@ public class TestBlockReport {
 
       // Get block from the first DN
       ret = cluster.getDataNodes().get(DN_N0).
-        data.getStoredBlock(lb.getBlock().getBlockId());
+        data.getStoredBlock(lb.getBlock()
+        .getBlockPoolId(), lb.getBlock().getBlockId());
     return ret;
   }
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java Fri Apr 29 18:16:32 2011
@@ -44,6 +44,9 @@ public class TestDataNodeMXBean {
 
       MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 
       ObjectName mxbeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
+      // get attribute "ClusterId"
+      String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
+      Assert.assertEquals(datanode.getClusterId(), clusterId);
       // get attribute "Version"
       String version = (String)mbs.getAttribute(mxbeanName, "Version");
       Assert.assertEquals(datanode.getVersion(),version);
@@ -53,10 +56,10 @@ public class TestDataNodeMXBean {
       // get attribute "HttpPort"
       String httpPort = (String)mbs.getAttribute(mxbeanName, "HttpPort");
       Assert.assertEquals(datanode.getHttpPort(),httpPort);
-      // get attribute "NamenodeAddress"
-      String namenodeAddress = (String)mbs.getAttribute(mxbeanName, 
-          "NamenodeAddress");
-      Assert.assertEquals(datanode.getNamenodeAddress(),namenodeAddress);
+      // get attribute "NamenodeAddresses"
+      String namenodeAddresses = (String)mbs.getAttribute(mxbeanName, 
+          "NamenodeAddresses");
+      Assert.assertEquals(datanode.getNamenodeAddresses(),namenodeAddresses);
       // get attribute "getVolumeInfo"
       String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo");
       Assert.assertEquals(replaceDigits(datanode.getVolumeInfo()),

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Apr 29 18:16:32 2011
@@ -33,12 +33,13 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 
@@ -119,7 +120,8 @@ public class TestDataNodeVolumeFailure {
     // fail the volume
     // delete/make non-writable one of the directories (failed volume)
     data_fail = new File(dataDir, "data3");
-    failedDir = new File(data_fail, MiniDFSCluster.FINALIZED_DIR_NAME);
+    failedDir = MiniDFSCluster.getFinalizedDir(dataDir, 
+        cluster.getNamesystem().getBlockPoolId());
     if (failedDir.exists() &&
         //!FileUtil.fullyDelete(failedDir)
         !deteteBlocks(failedDir)
@@ -137,8 +139,10 @@ public class TestDataNodeVolumeFailure {
     
     // make sure a block report is sent 
     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
-    long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs();
-    cluster.getNameNode().blockReport(dn.dnRegistration, bReport);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
+    long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
+    cluster.getNameNode().blockReport(dnR, bpid, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);
@@ -216,7 +220,7 @@ public class TestDataNodeVolumeFailure {
     
     for (LocatedBlock lb : locatedBlocks) {
       DatanodeInfo dinfo = lb.getLocations()[1];
-      Block b = lb.getBlock();
+      ExtendedBlock b = lb.getBlock();
       try {
         accessBlock(dinfo, lb);
       } catch (IOException e) {
@@ -254,8 +258,7 @@ public class TestDataNodeVolumeFailure {
     throws IOException {
     InetSocketAddress targetAddr = null;
     Socket s = null;
-    BlockReader blockReader = null; 
-    Block block = lblock.getBlock(); 
+    ExtendedBlock block = lblock.getBlock(); 
    
     targetAddr = NetUtils.createSocketAddr(datanode.getName());
       
@@ -263,8 +266,10 @@ public class TestDataNodeVolumeFailure {
     s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-    blockReader = 
+    String file = BlockReader.getFileName(targetAddr, 
+        "test-blockpoolid",
+        block.getBlockId());
+    BlockReader blockReader = 
       BlockReader.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
 
@@ -314,9 +319,11 @@ public class TestDataNodeVolumeFailure {
    */
   private int countRealBlocks(Map<String, BlockLocs> map) {
     int total = 0;
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
     for(int i=0; i<dn_num; i++) {
-      for(int j=1; j<=2; j++) {
-        File dir = new File(dataDir, "data"+(2*i+j)+MiniDFSCluster.FINALIZED_DIR_NAME);
+      for(int j=0; j<=1; j++) {
+        File storageDir = MiniDFSCluster.getStorageDir(i, j);
+        File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
         if(dir == null) {
           System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
           continue;

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java Fri Apr 29 18:16:32 2011
@@ -147,9 +147,9 @@ public class TestDataNodeVolumeFailureRe
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file1, (short)3);
     ArrayList<DataNode> dns = cluster.getDataNodes();
-    assertTrue("DN1 should be up", DataNode.isDatanodeUp(dns.get(0)));
-    assertTrue("DN2 should be up", DataNode.isDatanodeUp(dns.get(1)));
-    assertTrue("DN3 should be up", DataNode.isDatanodeUp(dns.get(2)));
+    assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
+    assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
+    assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
 
     /*
      * The metrics should confirm the volume failures.
@@ -188,7 +188,7 @@ public class TestDataNodeVolumeFailureRe
     Path file2 = new Path("/test2");
     DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file2, (short)3);
-    assertTrue("DN3 should still be up", DataNode.isDatanodeUp(dns.get(2)));
+    assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
     assertEquals("Vol3 should report 1 failure",
         1, metrics3.volumesFailed.getCurrentIntervalValue());
     live.clear();
@@ -233,7 +233,7 @@ public class TestDataNodeVolumeFailureRe
     DFSTestUtil.createFile(fs, file3, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file3, (short)2);
     // Eventually the DN should go down
-    while (DataNode.isDatanodeUp(dns.get(2))) {
+    while (dns.get(2).isDatanodeUp()) {
       Thread.sleep(1000);
     }
     // and report two failed volumes

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java Fri Apr 29 18:16:32 2011
@@ -98,7 +98,7 @@ public class TestDatanodeRestart {
       out.write(writeBuf);
       out.hflush();
       DataNode dn = cluster.getDataNodes().get(0);
-      for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
+      for (FSVolume volume : ((FSDataset)dn.data).volumes.getVolumes()) {
         File currentDir = volume.getDir().getParentFile();
         File rbwDir = new File(currentDir, "rbw");
         for (File file : rbwDir.listFiles()) {
@@ -112,16 +112,17 @@ public class TestDatanodeRestart {
       dn = cluster.getDataNodes().get(0);
 
       // check volumeMap: one rwr replica
+      String bpid = cluster.getNamesystem().getBlockPoolId();
       ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
-      Assert.assertEquals(1, replicas.size());
-      ReplicaInfo replica = replicas.replicas().iterator().next();
+      Assert.assertEquals(1, replicas.size(bpid));
+      ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
       Assert.assertEquals(ReplicaState.RWR, replica.getState());
       if (isCorrupt) {
         Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
       } else {
         Assert.assertEquals(fileLen, replica.getNumBytes());
       }
-      dn.data.invalidate(new Block[]{replica});
+      dn.data.invalidate(bpid, new Block[]{replica});
     } finally {
       IOUtils.closeStream(out);
       if (fs.exists(src)) {
@@ -146,9 +147,10 @@ public class TestDatanodeRestart {
         DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
         DFSTestUtil.waitReplication(fs, fileName, (short)1);
       }
+      String bpid = cluster.getNamesystem().getBlockPoolId();
       DataNode dn = cluster.getDataNodes().get(0);
       Iterator<ReplicaInfo> replicasItor = 
-        ((FSDataset)dn.data).volumeMap.replicas().iterator();
+        ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
       ReplicaInfo replica = replicasItor.next();
       createUnlinkTmpFile(replica, true, true); // rename block file
       createUnlinkTmpFile(replica, false, true); // rename meta file
@@ -165,7 +167,7 @@ public class TestDatanodeRestart {
 
       // check volumeMap: 4 finalized replica
       Collection<ReplicaInfo> replicas = 
-        ((FSDataset)(dn.data)).volumeMap.replicas();
+        ((FSDataset)(dn.data)).volumeMap.replicas(bpid);
       Assert.assertEquals(4, replicas.size());
       replicasItor = replicas.iterator();
       while (replicasItor.hasNext()) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Fri Apr 29 18:16:32 2011
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -48,6 +50,7 @@ public class TestDirectoryScanner extend
   private static final int DEFAULT_GEN_STAMP = 9999;
 
   private MiniDFSCluster cluster;
+  private String bpid;
   private FSDataset fds = null;
   private DirectoryScanner scanner = null;
   private Random rand = new Random();
@@ -69,7 +72,7 @@ public class TestDirectoryScanner extend
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
     synchronized (fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Truncate a block file that has a corresponding metadata file
@@ -88,7 +91,7 @@ public class TestDirectoryScanner extend
   /** Delete a block file */
   private long deleteBlockFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Delete a block file that has corresponding metadata file
@@ -104,7 +107,7 @@ public class TestDirectoryScanner extend
   /** Delete block meta file */
   private long deleteMetaFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File file = b.getMetaFile();
         // Delete a metadata file
         if (file.exists() && file.delete()) {
@@ -121,7 +124,7 @@ public class TestDirectoryScanner extend
     long id = rand.nextLong();
     while (true) {
       id = rand.nextLong();
-      if (fds.fetchReplicaInfo(id) == null) {
+      if (fds.fetchReplicaInfo(bpid, id) == null) {
         break;
       }
     }
@@ -139,10 +142,11 @@ public class TestDirectoryScanner extend
 
   /** Create a block file in a random volume*/
   private long createBlockFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
     }
@@ -151,10 +155,11 @@ public class TestDirectoryScanner extend
 
   /** Create a metafile in a random volume*/
   private long createMetaFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getMetaFile(id));
     if (file.createNewFile()) {
       LOG.info("Created metafile " + file.getName());
     }
@@ -163,10 +168,11 @@ public class TestDirectoryScanner extend
 
   /** Create block file and corresponding metafile in a rondom volume */
   private long createBlockMetaFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
 
@@ -185,7 +191,7 @@ public class TestDirectoryScanner extend
         LOG.info("Created extraneous file " + name2);
       }
 
-      file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+      file = new File(finalizedDir, getMetaFile(id));
       if (file.createNewFile()) {
         LOG.info("Created metafile " + file.getName());
       }
@@ -196,12 +202,18 @@ public class TestDirectoryScanner extend
   private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
       long missingMemoryBlocks, long mismatchBlocks) {
     scanner.reconcile();
-    assertEquals(totalBlocks, scanner.totalBlocks);
-    assertEquals(diffsize, scanner.diff.size());
-    assertEquals(missingMetaFile, scanner.missingMetaFile);
-    assertEquals(missingBlockFile, scanner.missingBlockFile);
-    assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks);
-    assertEquals(mismatchBlocks, scanner.mismatchBlocks);
+    
+    assertTrue(scanner.diffs.containsKey(bpid));
+    LinkedList<DirectoryScanner.ScanInfo> diff = scanner.diffs.get(bpid);
+    assertTrue(scanner.stats.containsKey(bpid));
+    DirectoryScanner.Stats stats = scanner.stats.get(bpid);
+    
+    assertEquals(diffsize, diff.size());
+    assertEquals(totalBlocks, stats.totalBlocks);
+    assertEquals(missingMetaFile, stats.missingMetaFile);
+    assertEquals(missingBlockFile, stats.missingBlockFile);
+    assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
+    assertEquals(mismatchBlocks, stats.mismatchBlocks);
   }
 
   public void testDirectoryScanner() throws Exception {
@@ -215,10 +227,12 @@ public class TestDirectoryScanner extend
     cluster = new MiniDFSCluster.Builder(CONF).build();
     try {
       cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
       fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks
       createFile("/tmp/t1", 10000);
@@ -318,19 +332,26 @@ public class TestDirectoryScanner extend
       truncateBlockFile();
       scan(totalBlocks+3, 6, 2, 2, 3, 2);
       scan(totalBlocks+1, 0, 0, 0, 0, 0);
+      
+      // Test14: validate clean shutdown of DirectoryScanner
+      ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+      
     } finally {
+      scanner.shutdown();
       cluster.shutdown();
     }
   }
 
   private void verifyAddition(long blockId, long genStamp, long size) {
     final ReplicaInfo replicainfo;
-    replicainfo = fds.fetchReplicaInfo(blockId);
+    replicainfo = fds.fetchReplicaInfo(bpid, blockId);
     assertNotNull(replicainfo);
 
     // Added block has the same file as the one created by the test
     File file = new File(getBlockFile(blockId));
-    assertEquals(file.getName(), fds.findBlockFile(blockId).getName());
+    assertEquals(file.getName(), fds.findBlockFile(bpid, blockId).getName());
 
     // Generation stamp is same as that of created file
     assertEquals(genStamp, replicainfo.getGenerationStamp());
@@ -341,12 +362,12 @@ public class TestDirectoryScanner extend
 
   private void verifyDeletion(long blockId) {
     // Ensure block does not exist in memory
-    assertNull(fds.fetchReplicaInfo(blockId));
+    assertNull(fds.fetchReplicaInfo(bpid, blockId));
   }
 
   private void verifyGenStamp(long blockId, long genStamp) {
     final ReplicaInfo memBlock;
-    memBlock = fds.fetchReplicaInfo(blockId);
+    memBlock = fds.fetchReplicaInfo(bpid, blockId);
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Fri Apr 29 18:16:32 2011
@@ -51,7 +51,6 @@ public class TestDiskError {
   private FileSystem fs;
   private MiniDFSCluster cluster;
   private Configuration conf;
-  private String dataDir;
 
   @Before
   public void setUp() throws Exception {
@@ -60,7 +59,6 @@ public class TestDiskError {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
-    dataDir = cluster.getDataDirectory();
   }
 
   @After
@@ -86,8 +84,11 @@ public class TestDiskError {
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
     final int dnIndex = 0;
-    File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "current/rbw");
-    File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "current/rbw");
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    File storageDir = MiniDFSCluster.getStorageDir(dnIndex, 0);
+    File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+    storageDir = MiniDFSCluster.getStorageDir(dnIndex, 1);
+    File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
     try {
       // make the data directory of the first datanode to be readonly
       assertTrue("Couldn't chmod local vol", dir1.setReadOnly());
@@ -95,7 +96,7 @@ public class TestDiskError {
 
       // create files and make sure that first datanode will be down
       DataNode dn = cluster.getDataNodes().get(dnIndex);
-      for (int i=0; DataNode.isDatanodeUp(dn); i++) {
+      for (int i=0; dn.isDatanodeUp(); i++) {
         Path fileName = new Path("/test.txt"+i);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
@@ -152,9 +153,11 @@ public class TestDiskError {
     out.close();
 
     // the temporary block & meta files should be deleted
-    String dataDir = cluster.getDataDirectory();
-    File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "current/rbw");
-    File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "current/rbw");
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    File storageDir = MiniDFSCluster.getStorageDir(sndNode, 0);
+    File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+    storageDir = MiniDFSCluster.getStorageDir(sndNode, 1);
+    File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
     while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
       Thread.sleep(100);
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Fri Apr 29 18:16:32 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -46,8 +47,8 @@ import org.junit.Test;
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol {
-  public static void checkMetaInfo(Block b, DataNode dn) throws IOException {
-    Block metainfo = dn.data.getStoredBlock(b.getBlockId());
+  public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
+    Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
     Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
     Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes());
   }
@@ -97,10 +98,12 @@ public class TestInterDatanodeProtocol {
       assertTrue(datanode != null);
       
       //stop block scanner, so we could compare lastScanTime
-      datanode.blockScannerThread.interrupt();
+      if (datanode.blockScanner != null) {
+        datanode.blockScanner.shutdown();
+      }
 
       //verify BlockMetaDataInfo
-      Block b = locatedblock.getBlock();
+      ExtendedBlock b = locatedblock.getBlock();
       InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
       checkMetaInfo(b, datanode);
       long recoveryId = b.getGenerationStamp() + 1;
@@ -108,7 +111,7 @@ public class TestInterDatanodeProtocol {
           new RecoveringBlock(b, locatedblock.getLocations(), recoveryId));
 
       //verify updateBlock
-      Block newblock = new Block(
+      ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(),
           b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
       idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes());
       checkMetaInfo(newblock, datanode);
@@ -129,44 +132,47 @@ public class TestInterDatanodeProtocol {
     Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState());
   }
 
-  /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */
+  /** Test 
+   * {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)}
+   */
   @Test
   public void testInitReplicaRecovery() throws IOException {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
-    final ReplicasMap map = new ReplicasMap();
+    final ReplicasMap map = new ReplicasMap(this);
+    String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {
       blocks[i] = new Block(firstblockid + i, length, gs);
-      map.add(createReplicaInfo(blocks[i]));
+      map.add(bpid, createReplicaInfo(blocks[i]));
     }
     
     { 
       //normal case
       final Block b = blocks[0];
-      final ReplicaInfo originalInfo = map.get(b);
+      final ReplicaInfo originalInfo = map.get(bpid, b);
 
       final long recoveryid = gs + 1;
-      final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid);
+      final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid);
       assertEquals(originalInfo, recoveryInfo);
 
-      final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b);
+      final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
       Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId());
       Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID());
 
       //recover one more time 
       final long recoveryid2 = gs + 2;
-      final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2);
+      final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
       assertEquals(originalInfo, recoveryInfo2);
 
-      final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b);
+      final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
       Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId());
       Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID());
       
       //case RecoveryInProgressException
       try {
-        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
         Assert.fail();
       }
       catch(RecoveryInProgressException ripe) {
@@ -177,7 +183,7 @@ public class TestInterDatanodeProtocol {
     { // BlockRecoveryFI_01: replica not found
       final long recoveryid = gs + 1;
       final Block b = new Block(firstblockid - 1, length, gs);
-      ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(map, b, recoveryid);
+      ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
       Assert.assertNull("Data-node should not have this replica.", r);
     }
     
@@ -185,7 +191,7 @@ public class TestInterDatanodeProtocol {
       final long recoveryid = gs - 1;
       final Block b = new Block(firstblockid + 1, length, gs);
       try {
-        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
         Assert.fail();
       }
       catch(IOException ioe) {
@@ -198,7 +204,7 @@ public class TestInterDatanodeProtocol {
       final long recoveryid = gs + 1;
       final Block b = new Block(firstblockid, length, gs+1);
       try {
-        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
         fail("InitReplicaRecovery should fail because replica's " +
         		"gs is less than the block's gs");
       } catch (IOException e) {
@@ -208,7 +214,10 @@ public class TestInterDatanodeProtocol {
     }
   }
 
-  /** Test {@link FSDataset#updateReplicaUnderRecovery(ReplicaUnderRecovery, long, long)} */
+  /** 
+   * Test  for
+   * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)} 
+   * */
   @Test
   public void testUpdateReplicaUnderRecovery() throws IOException {
     final Configuration conf = new HdfsConfiguration();
@@ -217,6 +226,7 @@ public class TestInterDatanodeProtocol {
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster.waitActive();
+      String bpid = cluster.getNamesystem().getBlockPoolId();
 
       //create a file
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
@@ -237,14 +247,14 @@ public class TestInterDatanodeProtocol {
       final FSDataset fsdataset = (FSDataset)datanode.data;
 
       //initReplicaRecovery
-      final Block b = locatedblock.getBlock();
+      final ExtendedBlock b = locatedblock.getBlock();
       final long recoveryid = b.getGenerationStamp() + 1;
       final long newlength = b.getNumBytes() - 1;
       final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery(
           new RecoveringBlock(b, null, recoveryid));
 
       //check replica
-      final ReplicaInfo replica = fsdataset.fetchReplicaInfo(b.getBlockId());
+      final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId());
       Assert.assertEquals(ReplicaState.RUR, replica.getState());
 
       //check meta data before update
@@ -254,8 +264,8 @@ public class TestInterDatanodeProtocol {
       //with (block length) != (stored replica's on disk length). 
       {
         //create a block with same id and gs but different length.
-        final Block tmp = new Block(rri.getBlockId(), rri.getNumBytes() - 1,
-            rri.getGenerationStamp());
+        final ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri
+            .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp());
         try {
           //update should fail
           fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength);
@@ -267,7 +277,7 @@ public class TestInterDatanodeProtocol {
 
       //update
       final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery(
-          rri, recoveryid, newlength);
+          new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
 
       //check meta data after update
       FSDataset.checkReplicaFiles(finalized);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java Fri Apr 29 18:16:32 2011
@@ -26,12 +26,13 @@ import org.junit.Test;
  * Unit test for ReplicasMap class
  */
 public class TestReplicasMap {
-  private static final ReplicasMap map = new ReplicasMap();
+  private static final ReplicasMap map = new ReplicasMap(TestReplicasMap.class);
+  private static final String bpid = "BP-TEST";
   private static final  Block block = new Block(1234, 1234, 1234);
   
   @BeforeClass
   public static void setup() {
-    map.add(new FinalizedReplica(block, null, null));
+    map.add(bpid, new FinalizedReplica(block, null, null));
   }
   
   /**
@@ -41,35 +42,35 @@ public class TestReplicasMap {
   public void testGet() {
     // Test 1: null argument throws invalid argument exception
     try {
-      map.get(null);
+      map.get(bpid, null);
       fail("Expected exception not thrown");
     } catch (IllegalArgumentException expected) { }
     
     // Test 2: successful lookup based on block
-    assertNotNull(map.get(block));
+    assertNotNull(map.get(bpid, block));
     
     // Test 3: Lookup failure - generation stamp mismatch 
     Block b = new Block(block);
     b.setGenerationStamp(0);
-    assertNull(map.get(b));
+    assertNull(map.get(bpid, b));
     
     // Test 4: Lookup failure - blockID mismatch
     b.setGenerationStamp(block.getGenerationStamp());
     b.setBlockId(0);
-    assertNull(map.get(b));
+    assertNull(map.get(bpid, b));
     
     // Test 5: successful lookup based on block ID
-    assertNotNull(map.get(block.getBlockId()));
+    assertNotNull(map.get(bpid, block.getBlockId()));
     
     // Test 6: failed lookup for invalid block ID
-    assertNull(map.get(0));
+    assertNull(map.get(bpid, 0));
   }
   
   @Test
   public void testAdd() {
     // Test 1: null argument throws invalid argument exception
     try {
-      map.add(null);
+      map.add(bpid, null);
       fail("Expected exception not thrown");
     } catch (IllegalArgumentException expected) { }
   }
@@ -78,28 +79,28 @@ public class TestReplicasMap {
   public void testRemove() {
     // Test 1: null argument throws invalid argument exception
     try {
-      map.remove(null);
+      map.remove(bpid, null);
       fail("Expected exception not thrown");
     } catch (IllegalArgumentException expected) { }
     
     // Test 2: remove failure - generation stamp mismatch 
     Block b = new Block(block);
     b.setGenerationStamp(0);
-    assertNull(map.remove(b));
+    assertNull(map.remove(bpid, b));
     
     // Test 3: remove failure - blockID mismatch
     b.setGenerationStamp(block.getGenerationStamp());
     b.setBlockId(0);
-    assertNull(map.remove(b));
+    assertNull(map.remove(bpid, b));
     
     // Test 4: remove success
-    assertNotNull(map.remove(block));
+    assertNotNull(map.remove(bpid, block));
     
     // Test 5: remove failure - invalid blockID
-    assertNull(map.remove(0));
+    assertNull(map.remove(bpid, 0));
     
     // Test 6: remove success
-    map.add(new FinalizedReplica(block, null, null));
-    assertNotNull(map.remove(block.getBlockId()));
+    map.add(bpid, new FinalizedReplica(block, null, null));
+    assertNotNull(map.remove(bpid, block.getBlockId()));
   }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java Fri Apr 29 18:16:32 2011
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import junit.framework.Assert;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -29,28 +32,28 @@ public class TestRoundRobinVolumesPolicy
   // Test the Round-Robin block-volume choosing algorithm.
   @Test
   public void testRR() throws Exception {
-    FSVolume[] volumes = new FSVolume[2];
+    final List<FSVolume> volumes = new ArrayList<FSVolume>();
 
     // First volume, with 100 bytes of space.
-    volumes[0] = Mockito.mock(FSVolume.class);
-    Mockito.when(volumes[0].getAvailable()).thenReturn(100L);
+    volumes.add(Mockito.mock(FSVolume.class));
+    Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
 
     // Second volume, with 200 bytes of space.
-    volumes[1] = Mockito.mock(FSVolume.class);
-    Mockito.when(volumes[1].getAvailable()).thenReturn(200L);
+    volumes.add(Mockito.mock(FSVolume.class));
+    Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
 
     RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
         RoundRobinVolumesPolicy.class, null);
     
     // Test two rounds of round-robin choosing
-    Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
 
     // The first volume has only 100L space, so the policy should
     // wisely choose the second one in case we ask for more.
-    Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 150));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
 
     // Fail if no volume can be chosen?
     try {