You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by kk...@apache.org on 2017/12/19 00:09:33 UTC
[42/50] [abbrv] hadoop git commit: HDFS-12818. Support multiple
storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen.
HDFS-12818. Support multiple storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94576b17
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94576b17
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94576b17
Branch: refs/heads/YARN-6592
Commit: 94576b17fbc19c440efafb6c3322f53ec78a5b55
Parents: 0010089
Author: Erik Krogen <ek...@linkedin.com>
Authored: Mon Dec 18 11:36:22 2017 -0800
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Mon Dec 18 11:36:22 2017 -0800
----------------------------------------------------------------------
.../server/datanode/SimulatedFSDataset.java | 308 +++++++++++++------
.../server/datanode/TestSimulatedFSDataset.java | 147 +++++----
...tSimulatedFSDatasetWithMultipleStorages.java | 50 +++
3 files changed, 352 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94576b17/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c31df4c..987ba97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -23,8 +23,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -37,11 +37,13 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
+import com.google.common.math.LongMath;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -88,6 +90,7 @@ import org.apache.hadoop.util.DataChecksum;
*/
public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public final static int BYTE_MASK = 0xff;
+ private final static int DEFAULT_NUM_SIMULATED_DATA_DIRS = 1;
static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
@@ -100,10 +103,42 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return true;
}
}
-
+
+ /**
+ * Used to change the default number of data storages and to mark the
+ * FSDataset as simulated.
+ */
+ static class TestUtilsFactory
+ extends FsDatasetTestUtils.Factory<FsDatasetTestUtils> {
+ @Override
+ public FsDatasetTestUtils newInstance(DataNode datanode) {
+ return new FsDatasetImplTestUtils(datanode) {
+ @Override
+ public int getDefaultNumOfDataDirs() {
+ return DEFAULT_NUM_SIMULATED_DATA_DIRS;
+ }
+ };
+ }
+
+ @Override
+ public boolean isSimulated() {
+ return true;
+ }
+
+ @Override
+ public int getDefaultNumOfDataDirs() {
+ return DEFAULT_NUM_SIMULATED_DATA_DIRS;
+ }
+
+ }
+
public static void setFactory(Configuration conf) {
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
Factory.class.getName());
+ conf.setClass("org.apache.hadoop.hdfs.server.datanode." +
+ "SimulatedFSDatasetTestUtilsFactory",
+ TestUtilsFactory.class, FsDatasetTestUtils.Factory.class
+ );
}
public static byte simulatedByte(Block b, long offsetInBlk) {
@@ -151,7 +186,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
if (theBlock.getNumBytes() < 0) {
theBlock.setNumBytes(0);
}
- if (!storage.alloc(bpid, theBlock.getNumBytes())) {
+ if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) {
// expected length - actual length may
// be more - we find out at finalize
DataNode.LOG.warn("Lack of free storage on a block alloc");
@@ -169,7 +204,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public String getStorageUuid() {
- return storage.getStorageUuid();
+ return getStorage(theBlock).getStorageUuid();
}
@Override
@@ -226,12 +261,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
// adjust if necessary
long extraLen = finalSize - theBlock.getNumBytes();
if (extraLen > 0) {
- if (!storage.alloc(bpid,extraLen)) {
+ if (!getStorage(theBlock).alloc(bpid, extraLen)) {
DataNode.LOG.warn("Lack of free storage on a block alloc");
throw new IOException("Creating block, no free space available");
}
} else {
- storage.free(bpid, -extraLen);
+ getStorage(theBlock).free(bpid, -extraLen);
}
theBlock.setNumBytes(finalSize);
@@ -271,7 +306,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
- volume, fileIoProvider);
+ getStorage(theBlock).getVolume(), fileIoProvider);
}
}
@@ -368,6 +403,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
*/
private static class SimulatedBPStorage {
private long used; // in bytes
+ private final Map<Block, BInfo> blockMap = new TreeMap<>();
long getUsed() {
return used;
@@ -381,6 +417,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
used -= amount;
}
+ Map<Block, BInfo> getBlockMap() {
+ return blockMap;
+ }
+
SimulatedBPStorage() {
used = 0;
}
@@ -392,10 +432,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
*/
private static class SimulatedStorage {
private final Map<String, SimulatedBPStorage> map =
- new HashMap<String, SimulatedBPStorage>();
+ new ConcurrentHashMap<>();
private final long capacity; // in bytes
private final DatanodeStorage dnStorage;
+ private final SimulatedVolume volume;
synchronized long getFree() {
return capacity - getUsed();
@@ -433,11 +474,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
getBPStorage(bpid).free(amount);
}
- SimulatedStorage(long cap, DatanodeStorage.State state) {
+ SimulatedStorage(long cap, DatanodeStorage.State state,
+ FileIoProvider fileIoProvider, Configuration conf) {
capacity = cap;
dnStorage = new DatanodeStorage(
"SimulatedStorage-" + DatanodeStorage.generateUuid(),
state, StorageType.DEFAULT);
+ DataNodeVolumeMetrics volumeMetrics =
+ DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID());
+ this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics);
}
synchronized void addBlockPool(String bpid) {
@@ -473,6 +518,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed(), 0L);
}
+
+ SimulatedVolume getVolume() {
+ return volume;
+ }
+
+ Map<Block, BInfo> getBlockMap(String bpid) throws IOException {
+ SimulatedBPStorage bpStorage = map.get(bpid);
+ if (bpStorage == null) {
+ throw new IOException("Nonexistent block pool: " + bpid);
+ }
+ return bpStorage.getBlockMap();
+ }
}
static class SimulatedVolume implements FsVolumeSpi {
@@ -601,10 +658,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
}
- private final Map<String, Map<Block, BInfo>> blockMap
- = new ConcurrentHashMap<String, Map<Block,BInfo>>();
- private final SimulatedStorage storage;
- private final SimulatedVolume volume;
+ private final List<SimulatedStorage> storages;
private final String datanodeUuid;
private final DataNode datanode;
@@ -615,27 +669,30 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
this.datanode = datanode;
- if (storage != null) {
+ int storageCount;
+ if (storage != null && storage.getNumStorageDirs() > 0) {
+ storageCount = storage.getNumStorageDirs();
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
DataStorage.createStorageID(storage.getStorageDir(i), false, conf);
}
this.datanodeUuid = storage.getDatanodeUuid();
} else {
+ storageCount = DataNode.getStorageLocations(conf).size();
this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
}
registerMBean(datanodeUuid);
this.fileIoProvider = new FileIoProvider(conf, datanode);
- this.storage = new SimulatedStorage(
- conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
- conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
-
- // TODO: per volume id or path
- DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf,
- datanodeUuid);
- this.volume = new SimulatedVolume(this.storage, this.fileIoProvider,
- volumeMetrics);
+
this.datasetLock = new AutoCloseableLock();
+
+ this.storages = new ArrayList<>();
+ for (int i = 0; i < storageCount; i++) {
+ this.storages.add(new SimulatedStorage(
+ conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
+ conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE),
+ fileIoProvider, conf));
+ }
}
public synchronized void injectBlocks(String bpid,
@@ -651,33 +708,50 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new IOException("Block already exists in block list");
}
}
- Map<Block, BInfo> map = blockMap.get(bpid);
- if (map == null) {
- map = new TreeMap<>();
- blockMap.put(bpid, map);
+
+ for (SimulatedStorage storage : storages) {
+ storage.addBlockPool(bpid);
}
-
+
for (Block b: injectBlocks) {
BInfo binfo = new BInfo(bpid, b, false);
- map.put(binfo.theBlock, binfo);
+ getBlockMap(b, bpid).put(binfo.theBlock, binfo);
}
}
}
+
+ /** Get the storage that a given block lives within. */
+ private SimulatedStorage getStorage(Block b) {
+ return storages.get(LongMath.mod(b.getBlockId(), storages.size()));
+ }
- /** Get a map for a given block pool Id */
- private Map<Block, BInfo> getMap(String bpid) throws IOException {
- final Map<Block, BInfo> map = blockMap.get(bpid);
- if (map == null) {
- throw new IOException("Non existent blockpool " + bpid);
- }
- return map;
+ /**
+ * Get the block map that a given block lives within, assuming it is within
+ * block pool bpid.
+ * @param b The block to look for
+ * @param bpid The block pool that contains b
+ * @return The block map (non-null)
+ * @throws IOException if bpid does not exist
+ */
+ private Map<Block, BInfo> getBlockMap(Block b, String bpid)
+ throws IOException {
+ return getStorage(b).getBlockMap(bpid);
+ }
+
+ /**
+ * Get the block map that a given block lives within.
+ * @param b The extended block to look for
+ * @return The block map (non-null)
+ * @throws IOException if b is in a nonexistent block pool
+ */
+ private Map<Block, BInfo> getBlockMap(ExtendedBlock b) throws IOException {
+ return getBlockMap(b.getLocalBlock(), b.getBlockPoolId());
}
@Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- BInfo binfo = map.get(b.getLocalBlock());
+ BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@@ -687,20 +761,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
if (isValidRbw(b)) {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- map.remove(b.getLocalBlock());
+ getBlockMap(b).remove(b.getLocalBlock());
}
}
- synchronized BlockListAsLongs getBlockReport(String bpid) {
+ synchronized BlockListAsLongs getBlockReport(String bpid,
+ SimulatedStorage storage) {
BlockListAsLongs.Builder report = BlockListAsLongs.builder();
- final Map<Block, BInfo> map = blockMap.get(bpid);
- if (map != null) {
- for (BInfo b : map.values()) {
+ try {
+ for (BInfo b : storage.getBlockMap(bpid).values()) {
if (b.isFinalized()) {
report.add(b);
}
}
+ } catch (IOException ioe) {
+ DataNode.LOG.error("Exception while getting block reports", ioe);
}
return report.build();
}
@@ -708,7 +783,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) {
- return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
+ Map<DatanodeStorage, BlockListAsLongs> blockReports = new HashMap<>();
+ for (SimulatedStorage storage : storages) {
+ blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage));
+ }
+ return blockReports;
}
@Override // FsDatasetSpi
@@ -718,27 +797,49 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FSDatasetMBean
public long getCapacity() {
- return storage.getCapacity();
+ long total = 0;
+ for (SimulatedStorage storage : storages) {
+ total += storage.getCapacity();
+ }
+ return total;
}
@Override // FSDatasetMBean
public long getDfsUsed() {
- return storage.getUsed();
+ long total = 0;
+ for (SimulatedStorage storage : storages) {
+ total += storage.getUsed();
+ }
+ return total;
}
@Override // FSDatasetMBean
public long getBlockPoolUsed(String bpid) throws IOException {
- return storage.getBlockPoolUsed(bpid);
+ long total = 0;
+ for (SimulatedStorage storage : storages) {
+ total += storage.getBlockPoolUsed(bpid);
+ }
+ return total;
}
@Override // FSDatasetMBean
public long getRemaining() {
- return storage.getFree();
+
+ long total = 0;
+ for (SimulatedStorage storage : storages) {
+ total += storage.getFree();
+ }
+ return total;
}
@Override // FSDatasetMBean
public int getNumFailedVolumes() {
- return storage.getNumFailedVolumes();
+
+ int total = 0;
+ for (SimulatedStorage storage : storages) {
+ total += storage.getNumFailedVolumes();
+ }
+ return total;
}
@Override // FSDatasetMBean
@@ -803,8 +904,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized long getLength(ExtendedBlock b) throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- BInfo binfo = map.get(b.getLocalBlock());
+ BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@@ -814,34 +914,38 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
@Deprecated
public Replica getReplica(String bpid, long blockId) {
- final Map<Block, BInfo> map = blockMap.get(bpid);
- if (map != null) {
- return map.get(new Block(blockId));
+ Block b = new Block(blockId);
+ try {
+ return getBlockMap(b, bpid).get(b);
+ } catch (IOException ioe) {
+ return null;
}
- return null;
}
@Override
public synchronized String getReplicaString(String bpid, long blockId) {
Replica r = null;
- final Map<Block, BInfo> map = blockMap.get(bpid);
- if (map != null) {
- r = map.get(new Block(blockId));
+ try {
+ Block b = new Block(blockId);
+ r = getBlockMap(b, bpid).get(b);
+ } catch (IOException ioe) {
+ // Ignore
}
return r == null? "null": r.toString();
}
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid) throws IOException {
- final Map<Block, BInfo> map = blockMap.get(bpid);
- if (map != null) {
- BInfo binfo = map.get(new Block(blkid));
+ Block b = new Block(blkid);
+ try {
+ BInfo binfo = getBlockMap(b, bpid).get(b);
if (binfo == null) {
return null;
}
return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
+ } catch (IOException ioe) {
+ return null;
}
- return null;
}
@Override // FsDatasetSpi
@@ -851,18 +955,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
if (invalidBlks == null) {
return;
}
- final Map<Block, BInfo> map = getMap(bpid);
for (Block b: invalidBlks) {
if (b == null) {
continue;
}
+ Map<Block, BInfo> map = getBlockMap(b, bpid);
BInfo binfo = map.get(b);
if (binfo == null) {
error = true;
DataNode.LOG.warn("Invalidate: Missing block");
continue;
}
- storage.free(bpid, binfo.getNumBytes());
+ getStorage(b).free(bpid, binfo.getNumBytes());
map.remove(b);
if (datanode != null) {
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
@@ -892,8 +996,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
private BInfo getBInfo(final ExtendedBlock b) {
- final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
- return map == null? null: map.get(b.getLocalBlock());
+ try {
+ return getBlockMap(b).get(b.getLocalBlock());
+ } catch (IOException ioe) {
+ return null;
+ }
}
@Override // {@link FsDatasetSpi}
@@ -957,8 +1064,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler append(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- BInfo binfo = map.get(b.getLocalBlock());
+ BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null || !binfo.isFinalized()) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@@ -970,7 +1076,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
@@ -988,7 +1094,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
@@ -1007,7 +1113,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public synchronized ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock());
if ( binfo == null) {
throw new ReplicaNotFoundException("Block " + b
@@ -1042,16 +1148,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
- map.put(binfo.theBlock, binfo);
+ getBlockMap(b).put(binfo.theBlock, binfo);
return new ReplicaHandler(binfo, null);
}
protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- BInfo binfo = map.get(b.getLocalBlock());
+ BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -1077,8 +1181,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException {
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- BInfo binfo = map.get(b.getLocalBlock());
+ BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -1266,8 +1369,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
ExtendedBlock b = rBlock.getBlock();
- final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
- BInfo binfo = map.get(b.getLocalBlock());
+ BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -1282,7 +1384,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
long recoveryId,
long newBlockId,
long newlength) throws IOException {
- return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock());
+ return getBInfo(oldBlock);
}
@Override // FsDatasetSpi
@@ -1292,15 +1394,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public void addBlockPool(String bpid, Configuration conf) {
- Map<Block, BInfo> map = new TreeMap<>();
- blockMap.put(bpid, map);
- storage.addBlockPool(bpid);
+ for (SimulatedStorage storage : storages) {
+ storage.addBlockPool(bpid);
+ }
}
@Override // FsDatasetSpi
public void shutdownBlockPool(String bpid) {
- blockMap.remove(bpid);
- storage.removeBlockPool(bpid);
+ for (SimulatedStorage storage : storages) {
+ storage.removeBlockPool(bpid);
+ }
}
@Override // FsDatasetSpi
@@ -1311,11 +1414,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock temporary)
throws IOException {
- final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
- if (map == null) {
- throw new IOException("Block pool not found, temporary=" + temporary);
- }
- final BInfo r = map.get(temporary.getLocalBlock());
+ final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock());
if (r == null) {
throw new IOException("Block not found, temporary=" + temporary);
} else if (r.isFinalized()) {
@@ -1359,7 +1458,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public FsVolumeReferences getFsVolumeReferences() {
- return new FsVolumeReferences(Collections.singletonList(volume));
+ List<SimulatedVolume> volumes = new ArrayList<>();
+ for (SimulatedStorage storage : storages) {
+ volumes.add(storage.getVolume());
+ }
+ return new FsVolumeReferences(volumes);
}
@Override
@@ -1371,14 +1474,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public DatanodeStorage getStorage(final String storageUuid) {
- return storageUuid.equals(storage.getStorageUuid()) ?
- storage.dnStorage :
- null;
+ for (SimulatedStorage storage : storages) {
+ if (storageUuid.equals(storage.getStorageUuid())) {
+ return storage.getDnStorage();
+ }
+ }
+ return null;
}
@Override
public StorageReport[] getStorageReports(String bpid) {
- return new StorageReport[] {storage.getStorageReport(bpid)};
+ List<StorageReport> reports = new ArrayList<>();
+ for (SimulatedStorage storage : storages) {
+ reports.add(storage.getStorageReport(bpid));
+ }
+ return reports.toArray(new StorageReport[0]);
}
@Override
@@ -1393,7 +1503,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public FsVolumeSpi getVolume(ExtendedBlock b) {
- return volume;
+ return getStorage(b.getLocalBlock()).getVolume();
}
@Override
@@ -1428,12 +1538,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void setPinning(ExtendedBlock b) throws IOException {
- blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
+ getBlockMap(b).get(b.getLocalBlock()).pinned = true;
}
@Override
public boolean getPinning(ExtendedBlock b) throws IOException {
- return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
+ return getBlockMap(b).get(b.getLocalBlock()).pinned;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94576b17/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4775fc7..dde9ad5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -26,20 +26,19 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.blockmanagement.SequentialBlockIdGenerator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DataChecksum;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -53,6 +52,16 @@ public class TestSimulatedFSDataset {
static final int BLOCK_LENGTH_MULTIPLIER = 79;
static final long FIRST_BLK_ID = 1;
+ private final int storageCount;
+
+ public TestSimulatedFSDataset() {
+ this(1);
+ }
+
+ protected TestSimulatedFSDataset(int storageCount) {
+ this.storageCount = storageCount;
+ }
+
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
@@ -187,43 +196,28 @@ public class TestSimulatedFSDataset {
@Test
public void testGetBlockReport() throws IOException {
- SimulatedFSDataset fsdataset = getSimulatedFSDataset();
- BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
- assertEquals(0, blockReport.getNumberOfBlocks());
+ final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
+ assertBlockReportCountAndSize(fsdataset, 0);
addSomeBlocks(fsdataset);
- blockReport = fsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- for (Block b: blockReport) {
- assertNotNull(b);
- assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- }
+ assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+ assertBlockLengthInBlockReports(fsdataset);
}
@Test
public void testInjectionEmpty() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset();
- BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
- assertEquals(0, blockReport.getNumberOfBlocks());
+ assertBlockReportCountAndSize(fsdataset, 0);
int bytesAdded = addSomeBlocks(fsdataset);
- blockReport = fsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- for (Block b: blockReport) {
- assertNotNull(b);
- assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- }
+ assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+ assertBlockLengthInBlockReports(fsdataset);
// Inject blocks into an empty fsdataset
// - injecting the blocks we got above.
SimulatedFSDataset sfsdataset = getSimulatedFSDataset();
- sfsdataset.injectBlocks(bpid, blockReport);
- blockReport = sfsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- for (Block b: blockReport) {
- assertNotNull(b);
- assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
- .getLength(new ExtendedBlock(bpid, b)));
- }
+ injectBlocksFromBlockReport(fsdataset, sfsdataset);
+ assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+ assertBlockLengthInBlockReports(fsdataset, sfsdataset);
+
assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
}
@@ -231,16 +225,10 @@ public class TestSimulatedFSDataset {
@Test
public void testInjectionNonEmpty() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset();
- BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
- assertEquals(0, blockReport.getNumberOfBlocks());
+ assertBlockReportCountAndSize(fsdataset, 0);
int bytesAdded = addSomeBlocks(fsdataset);
- blockReport = fsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- for (Block b: blockReport) {
- assertNotNull(b);
- assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- }
- fsdataset = null;
+ assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
+ assertBlockLengthInBlockReports(fsdataset);
// Inject blocks into an non-empty fsdataset
// - injecting the blocks we got above.
@@ -248,19 +236,10 @@ public class TestSimulatedFSDataset {
// Add come blocks whose block ids do not conflict with
// the ones we are going to inject.
bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1, false);
- sfsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- sfsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- sfsdataset.injectBlocks(bpid, blockReport);
- blockReport = sfsdataset.getBlockReport(bpid);
- assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
- for (Block b: blockReport) {
- assertNotNull(b);
- assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
- .getLength(new ExtendedBlock(bpid, b)));
- }
+ assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS);
+ injectBlocksFromBlockReport(fsdataset, sfsdataset);
+ assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS * 2);
+ assertBlockLengthInBlockReports(fsdataset, sfsdataset);
assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
@@ -270,7 +249,7 @@ public class TestSimulatedFSDataset {
try {
sfsdataset = getSimulatedFSDataset();
sfsdataset.addBlockPool(bpid, conf);
- sfsdataset.injectBlocks(bpid, blockReport);
+ injectBlocksFromBlockReport(fsdataset, sfsdataset);
assertTrue("Expected an IO exception", false);
} catch (IOException e) {
// ok - as expected
@@ -334,8 +313,68 @@ public class TestSimulatedFSDataset {
assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
}
}
-
- private SimulatedFSDataset getSimulatedFSDataset() {
+
+ /**
+ * Inject all of the blocks returned from sourceFSDataset's block reports
+ * into destinationFSDataset.
+ */
+ private void injectBlocksFromBlockReport(SimulatedFSDataset sourceFSDataset,
+ SimulatedFSDataset destinationFSDataset) throws IOException {
+ for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
+ sourceFSDataset.getBlockReports(bpid).entrySet()) {
+ destinationFSDataset.injectBlocks(bpid, ent.getValue());
+ }
+ }
+
+ /**
+ * Assert that the number of block reports returned from fsdataset matches
+ * {@code storageCount}, and that the total number of blocks is equal to
+ * expectedBlockCount.
+ */
+ private void assertBlockReportCountAndSize(SimulatedFSDataset fsdataset,
+ int expectedBlockCount) {
+ Map<DatanodeStorage, BlockListAsLongs> blockReportMap =
+ fsdataset.getBlockReports(bpid);
+ assertEquals(storageCount, blockReportMap.size());
+ int totalCount = 0;
+ for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
+ blockReportMap.entrySet()) {
+ totalCount += ent.getValue().getNumberOfBlocks();
+ }
+ assertEquals(expectedBlockCount, totalCount);
+ }
+
+ /**
+ * Convenience method to call {@link #assertBlockLengthInBlockReports(
+ * SimulatedFSDataset,SimulatedFSDataset)} with a null second parameter.
+ */
+ private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset)
+ throws IOException {
+ assertBlockLengthInBlockReports(fsdataset, null);
+ }
+
+ /**
+ * Assert that, for all of the blocks in the block report(s) returned from
+ * fsdataset, they are not null and their length matches the expectation.
+ * If otherFSDataset is non-null, additionally confirm that its idea of the
+ * length of the block matches as well.
+ */
+ private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset,
+ SimulatedFSDataset otherFSDataset) throws IOException {
+ for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
+ fsdataset.getBlockReports(bpid).entrySet()) {
+ for (Block b : ent.getValue()) {
+ assertNotNull(b);
+ assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
+ if (otherFSDataset != null) {
+ assertEquals(blockIdToLen(b.getBlockId()), otherFSDataset
+ .getLength(new ExtendedBlock(bpid, b)));
+ }
+ }
+ }
+ }
+
+ protected SimulatedFSDataset getSimulatedFSDataset() {
SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
fsdataset.addBlockPool(bpid, conf);
return fsdataset;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94576b17/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java
new file mode 100644
index 0000000..b31ae98
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test that the {@link SimulatedFSDataset} works correctly when configured
+ * with multiple storages.
+ */
+public class TestSimulatedFSDatasetWithMultipleStorages
+ extends TestSimulatedFSDataset {
+
+ public TestSimulatedFSDatasetWithMultipleStorages() {
+ super(2);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, "data1,data2");
+ }
+
+ @Test
+ public void testMultipleStoragesConfigured() {
+ SimulatedFSDataset fsDataset = getSimulatedFSDataset();
+ assertEquals(2, fsDataset.getStorageReports(bpid).length);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org