You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2017/12/18 21:06:13 UTC

[3/3] hadoop git commit: HDFS-12818. Support multiple storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen.

HDFS-12818. Support multiple storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen.

(cherry picked from commit 94576b17fbc19c440efafb6c3322f53ec78a5b55)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/da4e2f38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/da4e2f38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/da4e2f38

Branch: refs/heads/branch-2
Commit: da4e2f38e1eedef07e19ccb4b701f4b8f906bf2e
Parents: 1ef906e
Author: Erik Krogen <ek...@linkedin.com>
Authored: Mon Dec 18 11:36:22 2017 -0800
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Mon Dec 18 12:23:20 2017 -0800

----------------------------------------------------------------------
 .../server/datanode/SimulatedFSDataset.java     | 305 +++++++++++++------
 .../server/datanode/TestSimulatedFSDataset.java | 147 +++++----
 ...tSimulatedFSDatasetWithMultipleStorages.java |  50 +++
 3 files changed, 351 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/da4e2f38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 94b5b72..06bbe6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -22,22 +22,25 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.channels.ClosedChannelException;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import com.google.common.math.LongMath;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -83,6 +86,7 @@ import org.apache.hadoop.util.DataChecksum;
  */
 public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public final static int BYTE_MASK = 0xff;
+  private final static int DEFAULT_NUM_SIMULATED_DATA_DIRS = 1;
   static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
     @Override
     public SimulatedFSDataset newInstance(DataNode datanode,
@@ -96,9 +100,41 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
   }
 
+  /**
+   * Used to change the default number of data storages and to mark the
+   * FSDataset as simulated.
+   */
+  static class TestUtilsFactory
+      extends FsDatasetTestUtils.Factory<FsDatasetTestUtils> {
+    @Override
+    public FsDatasetTestUtils newInstance(DataNode datanode) {
+      return new FsDatasetImplTestUtils(datanode) {
+        @Override
+        public int getDefaultNumOfDataDirs() {
+          return DEFAULT_NUM_SIMULATED_DATA_DIRS;
+        }
+      };
+    }
+
+    @Override
+    public boolean isSimulated() {
+      return true;
+    }
+
+    @Override
+    public int getDefaultNumOfDataDirs() {
+      return DEFAULT_NUM_SIMULATED_DATA_DIRS;
+    }
+
+  }
+
   public static void setFactory(Configuration conf) {
     conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
         Factory.class.getName());
+    conf.setClass("org.apache.hadoop.hdfs.server.datanode." +
+            "SimulatedFSDatasetTestUtilsFactory",
+        TestUtilsFactory.class, FsDatasetTestUtils.Factory.class
+    );
   }
 
   public static byte simulatedByte(Block b, long offsetInBlk) {
@@ -145,7 +181,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       if (theBlock.getNumBytes() < 0) {
         theBlock.setNumBytes(0);
       }
-      if (!storage.alloc(bpid, theBlock.getNumBytes())) {
+      if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) {
         // expected length - actual length may
         // be more - we find out at finalize
         DataNode.LOG.warn("Lack of free storage on a block alloc");
@@ -163,7 +199,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     public String getStorageUuid() {
-      return storage.getStorageUuid();
+      return getStorage(theBlock).getStorageUuid();
     }
 
     @Override
@@ -220,12 +256,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       // adjust if necessary
       long extraLen = finalSize - theBlock.getNumBytes();
       if (extraLen > 0) {
-        if (!storage.alloc(bpid,extraLen)) {
+        if (!getStorage(theBlock).alloc(bpid, extraLen)) {
           DataNode.LOG.warn("Lack of free storage on a block alloc");
           throw new IOException("Creating block, no free space available");
         }
       } else {
-        storage.free(bpid, -extraLen);
+        getStorage(theBlock).free(bpid, -extraLen);
       }
       theBlock.setNumBytes(finalSize);
 
@@ -264,7 +300,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       } else {
         SimulatedOutputStream crcStream = new SimulatedOutputStream();
         return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
-            volume, fileIoProvider);
+            getStorage(theBlock).getVolume(), fileIoProvider);
       }
     }
 
@@ -339,6 +375,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
    */
   private static class SimulatedBPStorage {
     private long used;    // in bytes
+    private final Map<Block, BInfo> blockMap = new TreeMap<>();
 
     long getUsed() {
       return used;
@@ -352,6 +389,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       used -= amount;
     }
 
+    Map<Block, BInfo> getBlockMap() {
+      return blockMap;
+    }
+
     SimulatedBPStorage() {
       used = 0;
     }
@@ -363,10 +404,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
    */
   private static class SimulatedStorage {
     private final Map<String, SimulatedBPStorage> map =
-      new HashMap<String, SimulatedBPStorage>();
+        new ConcurrentHashMap<>();
 
     private final long capacity;  // in bytes
     private final DatanodeStorage dnStorage;
+    private final SimulatedVolume volume;
 
     synchronized long getFree() {
       return capacity - getUsed();
@@ -404,11 +446,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       getBPStorage(bpid).free(amount);
     }
 
-    SimulatedStorage(long cap, DatanodeStorage.State state) {
+    SimulatedStorage(long cap, DatanodeStorage.State state,
+        FileIoProvider fileIoProvider, Configuration conf) {
       capacity = cap;
       dnStorage = new DatanodeStorage(
           "SimulatedStorage-" + DatanodeStorage.generateUuid(),
           state, StorageType.DEFAULT);
+      DataNodeVolumeMetrics volumeMetrics =
+          DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID());
+      this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics);
     }
 
     synchronized void addBlockPool(String bpid) {
@@ -444,6 +490,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
           false, getCapacity(), getUsed(), getFree(),
           map.get(bpid).getUsed(), 0L);
     }
+
+    SimulatedVolume getVolume() {
+      return volume;
+    }
+
+    Map<Block, BInfo> getBlockMap(String bpid) throws IOException {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage == null) {
+        throw new IOException("Nonexistent block pool: " + bpid);
+      }
+      return bpStorage.getBlockMap();
+    }
   }
 
   static class SimulatedVolume implements FsVolumeSpi {
@@ -565,10 +623,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
   }
 
-  private final Map<String, Map<Block, BInfo>> blockMap
-      = new ConcurrentHashMap<String, Map<Block,BInfo>>();
-  private final SimulatedStorage storage;
-  private final SimulatedVolume volume;
+  private final List<SimulatedStorage> storages;
   private final String datanodeUuid;
   private final DataNode datanode;
 
@@ -579,27 +634,30 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
     this.datanode = datanode;
-    if (storage != null) {
+    int storageCount;
+    if (storage != null && storage.getNumStorageDirs() > 0) {
+      storageCount = storage.getNumStorageDirs();
       for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
         DataStorage.createStorageID(storage.getStorageDir(i), false);
       }
       this.datanodeUuid = storage.getDatanodeUuid();
     } else {
+      storageCount = DataNode.getStorageLocations(conf).size();
       this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
     }
 
     registerMBean(datanodeUuid);
     this.fileIoProvider = new FileIoProvider(conf, datanode);
-    this.storage = new SimulatedStorage(
-        conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
-        conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
-
-    // TODO: per volume id or path
-    DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf,
-        datanodeUuid);
-    this.volume = new SimulatedVolume(this.storage, this.fileIoProvider,
-        volumeMetrics);
+
     this.datasetLock = new AutoCloseableLock();
+
+    this.storages = new ArrayList<>();
+    for (int i = 0; i < storageCount; i++) {
+      this.storages.add(new SimulatedStorage(
+          conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
+          conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE),
+          fileIoProvider, conf));
+    }
   }
 
   public synchronized void injectBlocks(String bpid,
@@ -615,33 +673,50 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
           throw new IOException("Block already exists in  block list");
         }
       }
-      Map<Block, BInfo> map = blockMap.get(bpid);
-      if (map == null) {
-        map = new HashMap<Block, BInfo>();
-        blockMap.put(bpid, map);
+
+      for (SimulatedStorage storage : storages) {
+        storage.addBlockPool(bpid);
       }
 
       for (Block b: injectBlocks) {
         BInfo binfo = new BInfo(bpid, b, false);
-        map.put(binfo.theBlock, binfo);
+        getBlockMap(b, bpid).put(binfo.theBlock, binfo);
       }
     }
   }
 
-  /** Get a map for a given block pool Id */
-  private Map<Block, BInfo> getMap(String bpid) throws IOException {
-    final Map<Block, BInfo> map = blockMap.get(bpid);
-    if (map == null) {
-      throw new IOException("Non existent blockpool " + bpid);
-    }
-    return map;
+  /** Get the storage that a given block lives within. */
+  private SimulatedStorage getStorage(Block b) {
+    return storages.get(LongMath.mod(b.getBlockId(), storages.size()));
+  }
+  
+  /**
+   * Get the block map that a given block lives within, assuming it is within
+   * block pool bpid.
+   * @param b The block to look for
+   * @param bpid The block pool that contains b
+   * @return The block map (non-null)
+   * @throws IOException if bpid does not exist
+   */
+  private Map<Block, BInfo> getBlockMap(Block b, String bpid)
+      throws IOException {
+    return getStorage(b).getBlockMap(bpid);
+  }
+
+  /**
+   * Get the block map that a given block lives within.
+   * @param b The extended block to look for
+   * @return The block map (non-null)
+   * @throws IOException if b is in a nonexistent block pool
+   */
+  private Map<Block, BInfo> getBlockMap(ExtendedBlock b) throws IOException {
+    return getBlockMap(b.getLocalBlock(), b.getBlockPoolId());
   }
 
   @Override // FsDatasetSpi
   public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
       throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = map.get(b.getLocalBlock());
+    BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
@@ -651,20 +726,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override // FsDatasetSpi
   public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
     if (isValidRbw(b)) {
-      final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-      map.remove(b.getLocalBlock());
+      getBlockMap(b).remove(b.getLocalBlock());
     }
   }
 
-  synchronized BlockListAsLongs getBlockReport(String bpid) {
+  synchronized BlockListAsLongs getBlockReport(String bpid,
+      SimulatedStorage storage) {
     BlockListAsLongs.Builder report = BlockListAsLongs.builder();
-    final Map<Block, BInfo> map = blockMap.get(bpid);
-    if (map != null) {
-      for (BInfo b : map.values()) {
+    try {
+      for (BInfo b : storage.getBlockMap(bpid).values()) {
         if (b.isFinalized()) {
           report.add(b);
         }
       }
+    } catch (IOException ioe) {
+      DataNode.LOG.error("Exception while getting block reports", ioe);
     }
     return report.build();
   }
@@ -672,7 +748,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override
   public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
       String bpid) {
-    return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
+    Map<DatanodeStorage, BlockListAsLongs> blockReports = new HashMap<>();
+    for (SimulatedStorage storage : storages) {
+      blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage));
+    }
+    return blockReports;
   }
 
   @Override // FsDatasetSpi
@@ -682,27 +762,49 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FSDatasetMBean
   public long getCapacity() {
-    return storage.getCapacity();
+    long total = 0;
+    for (SimulatedStorage storage : storages) {
+      total += storage.getCapacity();
+    }
+    return total;
   }
 
   @Override // FSDatasetMBean
   public long getDfsUsed() {
-    return storage.getUsed();
+    long total = 0;
+    for (SimulatedStorage storage : storages) {
+      total += storage.getUsed();
+    }
+    return total;
   }
 
   @Override // FSDatasetMBean
   public long getBlockPoolUsed(String bpid) throws IOException {
-    return storage.getBlockPoolUsed(bpid);
+    long total = 0;
+    for (SimulatedStorage storage : storages) {
+      total += storage.getBlockPoolUsed(bpid);
+    }
+    return total;
   }
 
   @Override // FSDatasetMBean
   public long getRemaining() {
-    return storage.getFree();
+
+    long total = 0;
+    for (SimulatedStorage storage : storages) {
+      total += storage.getFree();
+    }
+    return total;
   }
 
   @Override // FSDatasetMBean
   public int getNumFailedVolumes() {
-    return storage.getNumFailedVolumes();
+
+    int total = 0;
+    for (SimulatedStorage storage : storages) {
+      total += storage.getNumFailedVolumes();
+    }
+    return total;
   }
 
   @Override // FSDatasetMBean
@@ -767,8 +869,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public synchronized long getLength(ExtendedBlock b) throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = map.get(b.getLocalBlock());
+    BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
@@ -778,34 +879,38 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override
   @Deprecated
   public Replica getReplica(String bpid, long blockId) {
-    final Map<Block, BInfo> map = blockMap.get(bpid);
-    if (map != null) {
-      return map.get(new Block(blockId));
+    Block b = new Block(blockId);
+    try {
+      return getBlockMap(b, bpid).get(b);
+    } catch (IOException ioe) {
+      return null;
     }
-    return null;
   }
 
   @Override
   public synchronized String getReplicaString(String bpid, long blockId) {
     Replica r = null;
-    final Map<Block, BInfo> map = blockMap.get(bpid);
-    if (map != null) {
-      r = map.get(new Block(blockId));
+    try {
+      Block b = new Block(blockId);
+      r = getBlockMap(b, bpid).get(b);
+    } catch (IOException ioe) {
+      // Ignore
     }
     return r == null? "null": r.toString();
   }
 
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid) throws IOException {
-    final Map<Block, BInfo> map = blockMap.get(bpid);
-    if (map != null) {
-      BInfo binfo = map.get(new Block(blkid));
+    Block b = new Block(blkid);
+    try {
+      BInfo binfo = getBlockMap(b, bpid).get(b);
       if (binfo == null) {
         return null;
       }
       return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
+    } catch (IOException ioe) {
+      return null;
     }
-    return null;
   }
 
   @Override // FsDatasetSpi
@@ -815,18 +920,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     if (invalidBlks == null) {
       return;
     }
-    final Map<Block, BInfo> map = getMap(bpid);
     for (Block b: invalidBlks) {
       if (b == null) {
         continue;
       }
+      Map<Block, BInfo> map = getBlockMap(b, bpid);
       BInfo binfo = map.get(b);
       if (binfo == null) {
         error = true;
         DataNode.LOG.warn("Invalidate: Missing block");
         continue;
       }
-      storage.free(bpid, binfo.getNumBytes());
+      getStorage(b).free(bpid, binfo.getNumBytes());
       map.remove(b);
       if (datanode != null) {
         datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
@@ -856,8 +961,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   private BInfo getBInfo(final ExtendedBlock b) {
-    final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
-    return map == null? null: map.get(b.getLocalBlock());
+    try {
+      return getBlockMap(b).get(b.getLocalBlock());
+    } catch (IOException ioe) {
+      return null;
+    }
   }
 
   @Override // {@link FsDatasetSpi}
@@ -921,8 +1029,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler append(
       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = map.get(b.getLocalBlock());
+    BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null || !binfo.isFinalized()) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -934,7 +1041,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler recoverAppend(
       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    final Map<Block, BInfo> map = getBlockMap(b);
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
@@ -952,7 +1059,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override // FsDatasetSpi
   public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
       throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    final Map<Block, BInfo> map = getBlockMap(b);
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
@@ -971,7 +1078,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public synchronized ReplicaHandler recoverRbw(
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    final Map<Block, BInfo> map = getBlockMap(b);
     BInfo binfo = map.get(b.getLocalBlock());
     if ( binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
@@ -1005,16 +1112,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         throw new ReplicaAlreadyExistsException("Block " + b +
             " is being written, and cannot be written to.");
     }
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
-    map.put(binfo.theBlock, binfo);
+    getBlockMap(b).put(binfo.theBlock, binfo);
     return new ReplicaHandler(binfo, null);
   }
 
   protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
       throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = map.get(b.getLocalBlock());
+    BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );
     }
@@ -1040,8 +1145,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override // FsDatasetSpi
   public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
       ) throws IOException {
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = map.get(b.getLocalBlock());
+    BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );
     }
@@ -1229,8 +1333,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
   throws IOException {
     ExtendedBlock b = rBlock.getBlock();
-    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = map.get(b.getLocalBlock());
+    BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );
     }
@@ -1245,7 +1348,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
                                         long recoveryId,
                                         long newBlockId,
                                         long newlength) throws IOException {
-    return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock());
+    return getBInfo(oldBlock);
   }
 
   @Override // FsDatasetSpi
@@ -1255,15 +1358,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public void addBlockPool(String bpid, Configuration conf) {
-    Map<Block, BInfo> map = new HashMap<Block, BInfo>();
-    blockMap.put(bpid, map);
-    storage.addBlockPool(bpid);
+    for (SimulatedStorage storage : storages) {
+      storage.addBlockPool(bpid);
+    }
   }
 
   @Override // FsDatasetSpi
   public void shutdownBlockPool(String bpid) {
-    blockMap.remove(bpid);
-    storage.removeBlockPool(bpid);
+    for (SimulatedStorage storage : storages) {
+      storage.removeBlockPool(bpid);
+    }
   }
 
   @Override // FsDatasetSpi
@@ -1274,11 +1378,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   @Override
   public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
       throws IOException {
-    final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
-    if (map == null) {
-      throw new IOException("Block pool not found, temporary=" + temporary);
-    }
-    final BInfo r = map.get(temporary.getLocalBlock());
+    final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock());
     if (r == null) {
       throw new IOException("Block not found, temporary=" + temporary);
     } else if (r.isFinalized()) {
@@ -1329,7 +1429,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public FsVolumeReferences getFsVolumeReferences() {
-    return new FsVolumeReferences(Collections.singletonList(volume));
+    List<SimulatedVolume> volumes = new ArrayList<>();
+    for (SimulatedStorage storage : storages) {
+      volumes.add(storage.getVolume());
+    }
+    return new FsVolumeReferences(volumes);
   }
 
   @Override
@@ -1341,14 +1445,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public DatanodeStorage getStorage(final String storageUuid) {
-    return storageUuid.equals(storage.getStorageUuid()) ?
-        storage.dnStorage :
-        null;
+    for (SimulatedStorage storage : storages) {
+      if (storageUuid.equals(storage.getStorageUuid())) {
+        return storage.getDnStorage();
+      }
+    }
+    return null;
   }
 
   @Override
   public StorageReport[] getStorageReports(String bpid) {
-    return new StorageReport[] {storage.getStorageReport(bpid)};
+    List<StorageReport> reports = new ArrayList<>();
+    for (SimulatedStorage storage : storages) {
+      reports.add(storage.getStorageReport(bpid));
+    }
+    return reports.toArray(new StorageReport[0]);
   }
 
   @Override
@@ -1363,7 +1474,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public FsVolumeSpi getVolume(ExtendedBlock b) {
-    return volume;
+    return getStorage(b.getLocalBlock()).getVolume();
   }
 
   @Override
@@ -1397,12 +1508,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public void setPinning(ExtendedBlock b) throws IOException {
-    blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
+    getBlockMap(b).get(b.getLocalBlock()).pinned = true;
   }
 
   @Override
   public boolean getPinning(ExtendedBlock b) throws IOException {
-    return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
+    return getBlockMap(b).get(b.getLocalBlock()).pinned;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da4e2f38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 9a77350..62ed710 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -26,20 +26,19 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.blockmanagement.SequentialBlockIdGenerator;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.DataChecksum;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,6 +52,16 @@ public class TestSimulatedFSDataset {
   static final int BLOCK_LENGTH_MULTIPLIER = 79;
   static final long FIRST_BLK_ID = 1;
 
+  private final int storageCount;
+
+  public TestSimulatedFSDataset() {
+    this(1);
+  }
+
+  protected TestSimulatedFSDataset(int storageCount) {
+    this.storageCount = storageCount;
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
@@ -187,43 +196,28 @@ public class TestSimulatedFSDataset {
 
   @Test
   public void testGetBlockReport() throws IOException {
-    SimulatedFSDataset fsdataset = getSimulatedFSDataset(); 
-    BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
-    assertEquals(0, blockReport.getNumberOfBlocks());
+    final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
+    assertBlockReportCountAndSize(fsdataset, 0);
     addSomeBlocks(fsdataset);
-    blockReport = fsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    for (Block b: blockReport) {
-      assertNotNull(b);
-      assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
-    }
+    assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+    assertBlockLengthInBlockReports(fsdataset);
   }
   
   @Test
   public void testInjectionEmpty() throws IOException {
     SimulatedFSDataset fsdataset = getSimulatedFSDataset(); 
-    BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
-    assertEquals(0, blockReport.getNumberOfBlocks());
+    assertBlockReportCountAndSize(fsdataset, 0);
     int bytesAdded = addSomeBlocks(fsdataset);
-    blockReport = fsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    for (Block b: blockReport) {
-      assertNotNull(b);
-      assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
-    }
+    assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+    assertBlockLengthInBlockReports(fsdataset);
     
     // Inject blocks into an empty fsdataset
     //  - injecting the blocks we got above.
     SimulatedFSDataset sfsdataset = getSimulatedFSDataset();
-    sfsdataset.injectBlocks(bpid, blockReport);
-    blockReport = sfsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    for (Block b: blockReport) {
-      assertNotNull(b);
-      assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
-      assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
-          .getLength(new ExtendedBlock(bpid, b)));
-    }
+    injectBlocksFromBlockReport(fsdataset, sfsdataset);
+    assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+    assertBlockLengthInBlockReports(fsdataset, sfsdataset);
+
     assertEquals(bytesAdded, sfsdataset.getDfsUsed());
     assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
   }
@@ -231,16 +225,10 @@ public class TestSimulatedFSDataset {
   @Test
   public void testInjectionNonEmpty() throws IOException {
     SimulatedFSDataset fsdataset = getSimulatedFSDataset(); 
-    BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
-    assertEquals(0, blockReport.getNumberOfBlocks());
+    assertBlockReportCountAndSize(fsdataset, 0);
     int bytesAdded = addSomeBlocks(fsdataset);
-    blockReport = fsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    for (Block b: blockReport) {
-      assertNotNull(b);
-      assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
-    }
-    fsdataset = null;
+    assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+    assertBlockLengthInBlockReports(fsdataset);
     
     // Inject blocks into an non-empty fsdataset
     //  - injecting the blocks we got above.
@@ -248,19 +236,10 @@ public class TestSimulatedFSDataset {
     // Add come blocks whose block ids do not conflict with
     // the ones we are going to inject.
     bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1, false);
-    sfsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    sfsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
-    sfsdataset.injectBlocks(bpid, blockReport);
-    blockReport = sfsdataset.getBlockReport(bpid);
-    assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
-    for (Block b: blockReport) {
-      assertNotNull(b);
-      assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
-      assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
-          .getLength(new ExtendedBlock(bpid, b)));
-    }
+    assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS);
+    injectBlocksFromBlockReport(fsdataset, sfsdataset);
+    assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS * 2);
+    assertBlockLengthInBlockReports(fsdataset, sfsdataset);
     assertEquals(bytesAdded, sfsdataset.getDfsUsed());
     assertEquals(sfsdataset.getCapacity()-bytesAdded,  sfsdataset.getRemaining());
     
@@ -270,7 +249,7 @@ public class TestSimulatedFSDataset {
     try {
       sfsdataset = getSimulatedFSDataset();
       sfsdataset.addBlockPool(bpid, conf);
-      sfsdataset.injectBlocks(bpid, blockReport);
+      injectBlocksFromBlockReport(fsdataset, sfsdataset);
       assertTrue("Expected an IO exception", false);
     } catch (IOException e) {
       // ok - as expected
@@ -334,8 +313,68 @@ public class TestSimulatedFSDataset {
       assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
     }
   }
-  
-  private SimulatedFSDataset getSimulatedFSDataset() {
+
+  /**
+   * Inject all of the blocks returned from sourceFSDataset's block reports
+   * into destinationFSDataset.
+   */
+  private void injectBlocksFromBlockReport(SimulatedFSDataset sourceFSDataset,
+      SimulatedFSDataset destinationFSDataset) throws IOException {
+    for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
+        sourceFSDataset.getBlockReports(bpid).entrySet()) {
+      destinationFSDataset.injectBlocks(bpid, ent.getValue());
+    }
+  }
+
+  /**
+   * Assert that the number of block reports returned from fsdataset matches
+   * {@code storageCount}, and that the total number of blocks is equal to
+   * expectedBlockCount.
+   */
+  private void assertBlockReportCountAndSize(SimulatedFSDataset fsdataset,
+      int expectedBlockCount) {
+    Map<DatanodeStorage, BlockListAsLongs> blockReportMap =
+        fsdataset.getBlockReports(bpid);
+    assertEquals(storageCount, blockReportMap.size());
+    int totalCount = 0;
+    for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
+        blockReportMap.entrySet()) {
+      totalCount += ent.getValue().getNumberOfBlocks();
+    }
+    assertEquals(expectedBlockCount, totalCount);
+  }
+
+  /**
+   * Convenience method to call {@link #assertBlockLengthInBlockReports(
+   * SimulatedFSDataset,SimulatedFSDataset)} with a null second parameter.
+   */
+  private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset)
+      throws IOException {
+    assertBlockLengthInBlockReports(fsdataset, null);
+  }
+
+  /**
+   * Assert that, for all of the blocks in the block report(s) returned from
+   * fsdataset, they are not null and their length matches the expectation.
+   * If otherFSDataset is non-null, additionally confirm that its idea of the
+   * length of the block matches as well.
+   */
+  private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset,
+      SimulatedFSDataset otherFSDataset) throws IOException {
+    for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
+        fsdataset.getBlockReports(bpid).entrySet()) {
+      for (Block b : ent.getValue()) {
+        assertNotNull(b);
+        assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
+        if (otherFSDataset != null) {
+          assertEquals(blockIdToLen(b.getBlockId()), otherFSDataset
+              .getLength(new ExtendedBlock(bpid, b)));
+        }
+      }
+    }
+  }
+
+  protected SimulatedFSDataset getSimulatedFSDataset() {
     SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
     fsdataset.addBlockPool(bpid, conf);
     return fsdataset;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da4e2f38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java
new file mode 100644
index 0000000..b31ae98
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test that the {@link SimulatedFSDataset} works correctly when configured
+ * with multiple storages.
+ */
+public class TestSimulatedFSDatasetWithMultipleStorages
+    extends TestSimulatedFSDataset {
+
+  public TestSimulatedFSDatasetWithMultipleStorages() {
+    super(2);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, "data1,data2");
+  }
+
+  @Test
+  public void testMultipleStoragesConfigured() {
+    SimulatedFSDataset fsDataset = getSimulatedFSDataset();
+    assertEquals(2, fsDataset.getStorageReports(bpid).length);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org