You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC
svn commit: r1097905 [12/14] - in /hadoop/hdfs/trunk: ./ bin/
src/c++/libhdfs/ src/contrib/hdfsproxy/
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/j...
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Apr 29 18:16:32 2011
@@ -23,6 +23,7 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import javax.management.NotCompliantMBeanException;
@@ -34,8 +35,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,13 +95,14 @@ public class SimulatedFSDataset impleme
SimulatedOutputStream oStream = null;
private long bytesAcked;
private long bytesRcvd;
- BInfo(Block b, boolean forWriting) throws IOException {
+ BInfo(String bpid, Block b, boolean forWriting) throws IOException {
theBlock = new Block(b);
if (theBlock.getNumBytes() < 0) {
theBlock.setNumBytes(0);
}
- if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may
- // be more - we find out at finalize
+ if (!storage.alloc(bpid, theBlock.getNumBytes())) {
+ // expected length - actual length may
+ // be more - we find out at finalize
DataNode.LOG.warn("Lack of free storage on a block alloc");
throw new IOException("Creating block, no free space available");
}
@@ -140,7 +145,8 @@ public class SimulatedFSDataset impleme
}
}
- synchronized void finalizeBlock(long finalSize) throws IOException {
+ synchronized void finalizeBlock(String bpid, long finalSize)
+ throws IOException {
if (finalized) {
throw new IOException(
"Finalizing a block that has already been finalized" +
@@ -161,12 +167,12 @@ public class SimulatedFSDataset impleme
// adjust if necessary
long extraLen = finalSize - theBlock.getNumBytes();
if (extraLen > 0) {
- if (!storage.alloc(extraLen)) {
+ if (!storage.alloc(bpid,extraLen)) {
DataNode.LOG.warn("Lack of free storage on a block alloc");
throw new IOException("Creating block, no free space available");
}
} else {
- storage.free(-extraLen);
+ storage.free(bpid, -extraLen);
}
theBlock.setNumBytes(finalSize);
@@ -259,12 +265,41 @@ public class SimulatedFSDataset impleme
}
}
- static private class SimulatedStorage {
- private long capacity; // in bytes
+ /**
+ * Class is used for tracking block pool storage utilization similar
+ * to {@link BlockPoolSlice}
+ */
+ private static class SimulatedBPStorage {
private long used; // in bytes
+ long getUsed() {
+ return used;
+ }
+
+ void alloc(long amount) {
+ used += amount;
+ }
+
+ void free(long amount) {
+ used -= amount;
+ }
+
+ SimulatedBPStorage() {
+ used = 0;
+ }
+ }
+
+ /**
+ * Class used for tracking datanode level storage utilization similar
+ * to {@link FSVolumeSet}
+ */
+ private static class SimulatedStorage {
+ private Map<String, SimulatedBPStorage> map =
+ new HashMap<String, SimulatedBPStorage>();
+ private long capacity; // in bytes
+
synchronized long getFree() {
- return capacity - used;
+ return capacity - getUsed();
}
synchronized long getCapacity() {
@@ -272,29 +307,55 @@ public class SimulatedFSDataset impleme
}
synchronized long getUsed() {
+ long used = 0;
+ for (SimulatedBPStorage bpStorage : map.values()) {
+ used += bpStorage.getUsed();
+ }
return used;
}
- synchronized boolean alloc(long amount) {
+ synchronized long getBlockPoolUsed(String bpid) throws IOException {
+ return getBPStorage(bpid).getUsed();
+ }
+
+ synchronized boolean alloc(String bpid, long amount) throws IOException {
if (getFree() >= amount) {
- used += amount;
+ getBPStorage(bpid).alloc(amount);
return true;
- } else {
- return false;
}
+ return false;
}
- synchronized void free(long amount) {
- used -= amount;
+ synchronized void free(String bpid, long amount) throws IOException {
+ getBPStorage(bpid).free(amount);
}
SimulatedStorage(long cap) {
capacity = cap;
- used = 0;
+ }
+
+ synchronized void addBlockPool(String bpid) {
+ SimulatedBPStorage bpStorage = map.get(bpid);
+ if (bpStorage != null) {
+ return;
+ }
+ map.put(bpid, new SimulatedBPStorage());
+ }
+
+ synchronized void removeBlockPool(String bpid) {
+ map.remove(bpid);
+ }
+
+ private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
+ SimulatedBPStorage bpStorage = map.get(bpid);
+ if (bpStorage == null) {
+ throw new IOException("block pool " + bpid + " not found");
+ }
+ return bpStorage;
}
}
- private HashMap<Block, BInfo> blockMap = null;
+ private Map<String, Map<Block, BInfo>> blockMap = null;
private SimulatedStorage storage = null;
private String storageId;
@@ -302,7 +363,9 @@ public class SimulatedFSDataset impleme
setConf(conf);
}
- private SimulatedFSDataset() { // real construction when setConf called.. Uggg
+ // Constructor used for constructing the object using reflection
+ @SuppressWarnings("unused")
+ private SimulatedFSDataset() { // real construction when setConf called..
}
public Configuration getConf() {
@@ -316,14 +379,12 @@ public class SimulatedFSDataset impleme
registerMBean(storageId);
storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
- //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() +
- // "Used = " + getDfsUsed() + "Free =" + getRemaining());
-
- blockMap = new HashMap<Block,BInfo>();
+ blockMap = new HashMap<String, Map<Block,BInfo>>();
}
- public synchronized void injectBlocks(Iterable<Block> injectBlocks)
- throws IOException {
+ public synchronized void injectBlocks(String bpid,
+ Iterable<Block> injectBlocks) throws IOException {
+ ExtendedBlock blk = new ExtendedBlock();
if (injectBlocks != null) {
int numInjectedBlocks = 0;
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@@ -331,69 +392,95 @@ public class SimulatedFSDataset impleme
if (b == null) {
throw new NullPointerException("Null blocks in block list");
}
- if (isValidBlock(b)) {
+ blk.set(bpid, b);
+ if (isValidBlock(blk)) {
throw new IOException("Block already exists in block list");
}
}
- HashMap<Block, BInfo> oldBlockMap = blockMap;
- blockMap = new HashMap<Block,BInfo>(
- numInjectedBlocks + oldBlockMap.size());
- blockMap.putAll(oldBlockMap);
+ Map<Block, BInfo> map = blockMap.get(bpid);
+ if (map == null) {
+ map = new HashMap<Block, BInfo>();
+ blockMap.put(bpid, map);
+ }
+
for (Block b: injectBlocks) {
- BInfo binfo = new BInfo(b, false);
- blockMap.put(binfo.theBlock, binfo);
+ BInfo binfo = new BInfo(bpid, b, false);
+ map.put(binfo.theBlock, binfo);
}
}
}
+
+ /** Get a map for a given block pool Id */
+ private Map<Block, BInfo> getMap(String bpid) throws IOException {
+ final Map<Block, BInfo> map = blockMap.get(bpid);
+ if (map == null) {
+ throw new IOException("Non existent blockpool " + bpid);
+ }
+ return map;
+ }
- @Override
- public synchronized void finalizeBlock(Block b) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
- binfo.finalizeBlock(b.getNumBytes());
-
+ binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes());
}
- @Override
- public synchronized void unfinalizeBlock(Block b) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
if (isValidRbw(b)) {
- blockMap.remove(b);
+ blockMap.remove(b.getLocalBlock());
}
}
@Override
- public synchronized BlockListAsLongs getBlockReport() {
- Block[] blockTable = new Block[blockMap.size()];
- int count = 0;
- for (BInfo b : blockMap.values()) {
- if (b.isFinalized()) {
- blockTable[count++] = b.theBlock;
+ public synchronized BlockListAsLongs getBlockReport(String bpid) {
+ final Map<Block, BInfo> map = blockMap.get(bpid);
+ Block[] blockTable = new Block[map.size()];
+ if (map != null) {
+ int count = 0;
+ for (BInfo b : map.values()) {
+ if (b.isFinalized()) {
+ blockTable[count++] = b.theBlock;
+ }
}
- }
- if (count != blockTable.length) {
- blockTable = Arrays.copyOf(blockTable, count);
+ if (count != blockTable.length) {
+ blockTable = Arrays.copyOf(blockTable, count);
+ }
+ } else {
+ blockTable = new Block[0];
}
return new BlockListAsLongs(
new ArrayList<Block>(Arrays.asList(blockTable)), null);
}
+ @Override // FSDatasetMBean
public long getCapacity() throws IOException {
return storage.getCapacity();
}
+ @Override // FSDatasetMBean
public long getDfsUsed() throws IOException {
return storage.getUsed();
}
+ @Override // FSDatasetMBean
+ public long getBlockPoolUsed(String bpid) throws IOException {
+ return storage.getBlockPoolUsed(bpid);
+ }
+
+ @Override // FSDatasetMBean
public long getRemaining() throws IOException {
return storage.getFree();
}
- @Override
- public synchronized long getLength(Block b) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized long getLength(ExtendedBlock b) throws IOException {
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@@ -402,65 +489,84 @@ public class SimulatedFSDataset impleme
@Override
@Deprecated
- public Replica getReplica(long blockId) {
- return blockMap.get(new Block(blockId));
+ public Replica getReplica(String bpid, long blockId) {
+ final Map<Block, BInfo> map = blockMap.get(bpid);
+ if (map != null) {
+ return map.get(new Block(blockId));
+ }
+ return null;
}
@Override
- public synchronized String getReplicaString(long blockId) {
- final Replica r = blockMap.get(new Block(blockId));
+ public synchronized String getReplicaString(String bpid, long blockId) {
+ Replica r = null;
+ final Map<Block, BInfo> map = blockMap.get(bpid);
+ if (map != null) {
+ r = map.get(new Block(blockId));
+ }
return r == null? "null": r.toString();
}
- @Override
- public Block getStoredBlock(long blkid) throws IOException {
- Block b = new Block(blkid);
- BInfo binfo = blockMap.get(b);
- if (binfo == null) {
- return null;
+ @Override // FSDatasetInterface
+ public Block getStoredBlock(String bpid, long blkid) throws IOException {
+ final Map<Block, BInfo> map = blockMap.get(bpid);
+ if (map != null) {
+ BInfo binfo = map.get(new Block(blkid));
+ if (binfo == null) {
+ return null;
+ }
+ return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
}
- b.setGenerationStamp(binfo.getGenerationStamp());
- b.setNumBytes(binfo.getNumBytes());
- return b;
+ return null;
}
- @Override
- public synchronized void invalidate(Block[] invalidBlks) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized void invalidate(String bpid, Block[] invalidBlks)
+ throws IOException {
boolean error = false;
if (invalidBlks == null) {
return;
}
+ final Map<Block, BInfo> map = getMap(bpid);
for (Block b: invalidBlks) {
if (b == null) {
continue;
}
- BInfo binfo = blockMap.get(b);
+ BInfo binfo = map.get(b);
if (binfo == null) {
error = true;
DataNode.LOG.warn("Invalidate: Missing block");
continue;
}
- storage.free(binfo.getNumBytes());
+ storage.free(bpid, binfo.getNumBytes());
blockMap.remove(b);
}
- if (error) {
- throw new IOException("Invalidate: Missing blocks.");
- }
+ if (error) {
+ throw new IOException("Invalidate: Missing blocks.");
+ }
}
- @Override
- public synchronized boolean isValidBlock(Block b) {
- // return (blockMap.containsKey(b));
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized boolean isValidBlock(ExtendedBlock b) {
+ final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
+ if (map == null) {
+ return false;
+ }
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
return false;
}
return binfo.isFinalized();
}
+ /* check if a block is created but not finalized */
@Override
- public synchronized boolean isValidRbw(Block b) {
- BInfo binfo = blockMap.get(b);
+ public synchronized boolean isValidRbw(ExtendedBlock b) {
+ final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
+ if (map == null) {
+ return false;
+ }
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
return false;
}
@@ -472,10 +578,11 @@ public class SimulatedFSDataset impleme
return getStorageInfo();
}
- @Override
- public synchronized ReplicaInPipelineInterface append(Block b,
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null || !binfo.isFinalized()) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@@ -484,10 +591,11 @@ public class SimulatedFSDataset impleme
return binfo;
}
- @Override
- public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@@ -495,32 +603,34 @@ public class SimulatedFSDataset impleme
if (binfo.isFinalized()) {
binfo.unfinalizeBlock();
}
- blockMap.remove(b);
+ map.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
- blockMap.put(binfo.theBlock, binfo);
+ map.put(binfo.theBlock, binfo);
return binfo;
}
- @Override
- public void recoverClose(Block b, long newGS,
- long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+ throws IOException {
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
}
if (!binfo.isFinalized()) {
- binfo.finalizeBlock(binfo.getNumBytes());
+ binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes());
}
- blockMap.remove(b);
+ map.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS);
- blockMap.put(binfo.theBlock, binfo);
+ map.put(binfo.theBlock, binfo);
}
- @Override
- public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
- BInfo binfo = blockMap.get(b);
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if ( binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " does not exist, and cannot be appended to.");
@@ -529,20 +639,20 @@ public class SimulatedFSDataset impleme
throw new ReplicaAlreadyExistsException("Block " + b
+ " is valid, and cannot be written to.");
}
- blockMap.remove(b);
+ map.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
- blockMap.put(binfo.theBlock, binfo);
+ map.put(binfo.theBlock, binfo);
return binfo;
}
- @Override
- public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
throws IOException {
return createTemporary(b);
}
- @Override
- public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -552,35 +662,36 @@ public class SimulatedFSDataset impleme
throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
- BInfo binfo = new BInfo(b, true);
- blockMap.put(binfo.theBlock, binfo);
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
+ map.put(binfo.theBlock, binfo);
return binfo;
}
- @Override
- public synchronized InputStream getBlockInputStream(Block b)
- throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+ throws IOException {
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
- //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
return binfo.getIStream();
}
- @Override
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
- throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+ long seekOffset) throws IOException {
InputStream result = getBlockInputStream(b);
result.skip(seekOffset);
return result;
}
/** Not supported */
- @Override
- public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
- ) throws IOException {
+ @Override // FSDatasetInterface
+ public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
+ long ckoff) throws IOException {
throw new IOException("Not supported");
}
@@ -591,9 +702,10 @@ public class SimulatedFSDataset impleme
* @throws IOException - block does not exist or problems accessing
* the meta file
*/
- private synchronized InputStream getMetaDataInStream(Block b)
+ private synchronized InputStream getMetaDataInStream(ExtendedBlock b)
throws IOException {
- BInfo binfo = blockMap.get(b);
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -604,9 +716,11 @@ public class SimulatedFSDataset impleme
return binfo.getMetaIStream();
}
- @Override
- public synchronized long getMetaDataLength(Block b) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized long getMetaDataLength(ExtendedBlock b)
+ throws IOException {
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -617,16 +731,15 @@ public class SimulatedFSDataset impleme
return binfo.getMetaIStream().getLength();
}
- @Override
- public MetaDataInputStream getMetaDataInputStream(Block b)
- throws IOException {
-
- return new MetaDataInputStream(getMetaDataInStream(b),
- getMetaDataLength(b));
+ @Override // FSDatasetInterface
+ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
+ throws IOException {
+ return new MetaDataInputStream(getMetaDataInStream(b),
+ getMetaDataLength(b));
}
- @Override
- public synchronized boolean metaFileExists(Block b) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized boolean metaFileExists(ExtendedBlock b) throws IOException {
if (!isValidBlock(b)) {
throw new IOException("Block " + b +
" is valid, and cannot be written to.");
@@ -638,8 +751,8 @@ public class SimulatedFSDataset impleme
// nothing to check for simulated data set
}
- @Override
- public synchronized void adjustCrcChannelPosition(Block b,
+ @Override // FSDatasetInterface
+ public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
BlockWriteStreams stream,
int checksumSize)
throws IOException {
@@ -812,8 +925,9 @@ public class SimulatedFSDataset impleme
@Override
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
- Block b = rBlock.getBlock();
- BInfo binfo = blockMap.get(b);
+ ExtendedBlock b = rBlock.getBlock();
+ final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+ BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -824,22 +938,44 @@ public class SimulatedFSDataset impleme
}
@Override // FSDatasetInterface
- public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+ public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newlength) throws IOException {
return new FinalizedReplica(
oldBlock.getBlockId(), newlength, recoveryId, null, null);
}
- @Override
- public long getReplicaVisibleLength(Block block) throws IOException {
+ @Override // FSDatasetInterface
+ public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
return block.getNumBytes();
}
+ @Override // FSDatasetInterface
+ public void addBlockPool(String bpid, Configuration conf) {
+ Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+ blockMap.put(bpid, map);
+ storage.addBlockPool(bpid);
+ }
+
+ @Override // FSDatasetInterface
+ public void shutdownBlockPool(String bpid) {
+ blockMap.remove(bpid);
+ storage.removeBlockPool(bpid);
+ }
+
+ @Override // FSDatasetInterface
+ public void deleteBlockPool(String bpid, boolean force) {
+ return;
+ }
+
@Override
- public ReplicaInPipelineInterface convertTemporaryToRbw(Block temporary)
+ public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
throws IOException {
- final BInfo r = blockMap.get(temporary);
+ final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
+ if (map == null) {
+ throw new IOException("Block pool not found, temporary=" + temporary);
+ }
+ final BInfo r = map.get(temporary.getLocalBlock());
if (r == null) {
throw new IOException("Block not found, temporary=" + temporary);
} else if (r.isFinalized()) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Fri Apr 29 18:16:32 2011
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.REPLACE_BLOCK;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*;
import java.io.DataInputStream;
@@ -41,9 +40,9 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
/**
* This class tests if block replacement request to data nodes work correctly.
@@ -120,7 +118,7 @@ public class TestBlockReplacement extend
LocatedBlock block = locatedBlocks.get(0);
DatanodeInfo[] oldNodes = block.getLocations();
assertEquals(oldNodes.length, 3);
- Block b = block.getBlock();
+ ExtendedBlock b = block.getBlock();
// add a new datanode to the cluster
cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
@@ -161,11 +159,11 @@ public class TestBlockReplacement extend
// start to replace the block
// case 1: proxySource does not contain the block
LOG.info("Testcase 1: Proxy " + newNode.getName()
- + " does not contain the block " + b.getBlockName() );
+ + " does not contain the block " + b);
assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
// case 2: destination contains the block
LOG.info("Testcase 2: Destination " + proxies.get(1).getName()
- + " contains the block " + b.getBlockName() );
+ + " contains the block " + b);
assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
// case 3: correct case
LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" +
@@ -224,7 +222,7 @@ public class TestBlockReplacement extend
*
* Return true if a block is successfully copied; otherwise false.
*/
- private boolean replaceBlock( Block block, DatanodeInfo source,
+ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
Socket sock = new Socket();
sock.connect(NetUtils.createSocketAddr(
@@ -232,13 +230,8 @@ public class TestBlockReplacement extend
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- REPLACE_BLOCK.write(out);
- out.writeLong(block.getBlockId());
- out.writeLong(block.getGenerationStamp());
- Text.writeString(out, source.getStorageID());
- sourceProxy.write(out);
- BlockTokenSecretManager.DUMMY_TOKEN.write(out);
+ DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+ .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Fri Apr 29 18:16:32 2011
@@ -28,12 +28,14 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
@@ -67,6 +69,7 @@ public class TestBlockReport {
static final int BLOCK_SIZE = 1024;
static final int NUM_BLOCKS = 10;
static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
+ static String bpid;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
@@ -85,6 +88,7 @@ public class TestBlockReport {
REPL_FACTOR = 1; //Reset if case a test has modified the value
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
fs = (DistributedFileSystem) cluster.getFileSystem();
+ bpid = cluster.getNamesystem().getBlockPoolId();
}
@After
@@ -130,8 +134,11 @@ public class TestBlockReport {
b.getNumBytes());
}
}
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N0).dnRegistration,
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N0);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ cluster.getNameNode().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
List<LocatedBlock> blocksAfterReport =
@@ -143,7 +150,7 @@ public class TestBlockReport {
}
for (int i = 0; i < blocksAfterReport.size(); i++) {
- Block b = blocksAfterReport.get(i).getBlock();
+ ExtendedBlock b = blocksAfterReport.get(i).getBlock();
assertEquals("Length of " + i + "th block is incorrect",
oldLengths[i], b.getNumBytes());
}
@@ -171,7 +178,7 @@ public class TestBlockReport {
File dataDir = new File(cluster.getDataDirectory());
assertTrue(dataDir.isDirectory());
- List<Block> blocks2Remove = new ArrayList<Block>();
+ List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
List<Integer> removedIndex = new ArrayList<Integer>();
List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
filePath.toString(), FILE_START,
@@ -192,7 +199,7 @@ public class TestBlockReport {
LOG.debug("Number of blocks allocated " + lBlocks.size());
}
- for (Block b : blocks2Remove) {
+ for (ExtendedBlock b : blocks2Remove) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing the block " + b.getBlockName());
}
@@ -206,8 +213,11 @@ public class TestBlockReport {
waitTil(DN_RESCAN_EXTRA_WAIT);
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N0).dnRegistration,
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N0);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ cluster.getNameNode().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
cluster.getNamesystem().computeDatanodeWork();
@@ -241,9 +251,12 @@ public class TestBlockReport {
blocks.get(0).setGenerationStamp(rand.nextLong());
// This new block is unknown to NN and will be mark for deletion.
blocks.add(new Block());
- DatanodeCommand dnCmd =
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N0).dnRegistration,
+
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N0);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
if(LOG.isDebugEnabled()) {
LOG.debug("Got the command: " + dnCmd);
@@ -291,9 +304,12 @@ public class TestBlockReport {
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
startDNandWait(filePath, true);
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N1).dnRegistration,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N1);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ cluster.getNameNode().blockReport(dnR, poolId,
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of PendingReplication Blocks",
0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -327,8 +343,7 @@ public class TestBlockReport {
int randIndex = rand.nextInt(blocks.size());
// Get a block and screw its GS
Block corruptedBlock = blocks.get(randIndex);
- String secondNode = cluster.getDataNodes().get(DN_N1).
- getDatanodeRegistration().getStorageID();
+ String secondNode = cluster.getDataNodes().get(DN_N1).getStorageId();
if(LOG.isDebugEnabled()) {
LOG.debug("Working with " + secondNode);
LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
@@ -338,9 +353,12 @@ public class TestBlockReport {
LOG.debug("BlockGS after " + blocks.get(randIndex).getGenerationStamp());
LOG.debug("Done corrupting GS of " + corruptedBlock.getBlockName());
}
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N1).dnRegistration,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N1);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ cluster.getNameNode().blockReport(dnR, poolId,
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of Corrupted blocks",
1, cluster.getNamesystem().getCorruptReplicaBlocks() +
@@ -360,9 +378,9 @@ public class TestBlockReport {
if(LOG.isDebugEnabled()) {
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
}
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N1).dnRegistration,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+
+ cluster.getNameNode().blockReport(dnR, poolId,
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of Corrupted blocks",
@@ -406,10 +424,13 @@ public class TestBlockReport {
bc.start();
waitForTempReplica(bl, DN_N1);
-
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N1).dnRegistration,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N1);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ cluster.getNameNode().blockReport(dnR, poolId,
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of PendingReplication blocks",
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -450,9 +471,12 @@ public class TestBlockReport {
waitForTempReplica(bl, DN_N1);
- cluster.getNameNode().blockReport(
- cluster.getDataNodes().get(DN_N1).dnRegistration,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ // all blocks belong to the same file, hence same BP
+ DataNode dn = cluster.getDataNodes().get(DN_N1);
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ cluster.getNameNode().blockReport(dnR, poolId,
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
printStats();
assertEquals("Wrong number of PendingReplication blocks",
2, cluster.getNamesystem().getPendingReplicationBlocks());
@@ -465,7 +489,7 @@ public class TestBlockReport {
}
}
- private void waitForTempReplica(Block bl, int DN_N1) {
+ private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
final boolean tooLongWait = false;
final int TIMEOUT = 40000;
@@ -478,16 +502,18 @@ public class TestBlockReport {
if(LOG.isDebugEnabled()) {
LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
}
+ cluster.waitActive();
+
// Look about specified DN for the replica of the block from 1st DN
- Replica r;
- r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
- fetchReplicaInfo(bl.getBlockId());
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
+ fetchReplicaInfo(bpid, bl.getBlockId());
long start = System.currentTimeMillis();
int count = 0;
while (r == null) {
waitTil(5);
r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
- fetchReplicaInfo(bl.getBlockId());
+ fetchReplicaInfo(bpid, bl.getBlockId());
long waiting_period = System.currentTimeMillis() - start;
if (count++ % 100 == 0)
if(LOG.isDebugEnabled()) {
@@ -548,8 +574,8 @@ public class TestBlockReport {
if(LOG.isDebugEnabled()) {
LOG.debug("New datanode "
- + cluster.getDataNodes().get(datanodes.size() - 1)
- .getDatanodeRegistration() + " has been started");
+ + cluster.getDataNodes().get(datanodes.size() - 1).getMachineName()
+ + " has been started");
}
if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
}
@@ -593,7 +619,7 @@ public class TestBlockReport {
}
continue;
}
- newList.add(new Block(locatedBlks.get(i).getBlock()));
+ newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock()));
}
return newList;
}
@@ -685,7 +711,8 @@ public class TestBlockReport {
// Get block from the first DN
ret = cluster.getDataNodes().get(DN_N0).
- data.getStoredBlock(lb.getBlock().getBlockId());
+ data.getStoredBlock(lb.getBlock()
+ .getBlockPoolId(), lb.getBlock().getBlockId());
return ret;
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java Fri Apr 29 18:16:32 2011
@@ -44,6 +44,9 @@ public class TestDataNodeMXBean {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
+ // get attribute "ClusterId"
+ String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
+ Assert.assertEquals(datanode.getClusterId(), clusterId);
// get attribute "Version"
String version = (String)mbs.getAttribute(mxbeanName, "Version");
Assert.assertEquals(datanode.getVersion(),version);
@@ -53,10 +56,10 @@ public class TestDataNodeMXBean {
// get attribute "HttpPort"
String httpPort = (String)mbs.getAttribute(mxbeanName, "HttpPort");
Assert.assertEquals(datanode.getHttpPort(),httpPort);
- // get attribute "NamenodeAddress"
- String namenodeAddress = (String)mbs.getAttribute(mxbeanName,
- "NamenodeAddress");
- Assert.assertEquals(datanode.getNamenodeAddress(),namenodeAddress);
+ // get attribute "NamenodeAddresses"
+ String namenodeAddresses = (String)mbs.getAttribute(mxbeanName,
+ "NamenodeAddresses");
+ Assert.assertEquals(datanode.getNamenodeAddresses(),namenodeAddresses);
// get attribute "getVolumeInfo"
String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo");
Assert.assertEquals(replaceDigits(datanode.getVolumeInfo()),
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Apr 29 18:16:32 2011
@@ -33,12 +33,13 @@ import org.apache.hadoop.hdfs.BlockReade
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
@@ -119,7 +120,8 @@ public class TestDataNodeVolumeFailure {
// fail the volume
// delete/make non-writable one of the directories (failed volume)
data_fail = new File(dataDir, "data3");
- failedDir = new File(data_fail, MiniDFSCluster.FINALIZED_DIR_NAME);
+ failedDir = MiniDFSCluster.getFinalizedDir(dataDir,
+ cluster.getNamesystem().getBlockPoolId());
if (failedDir.exists() &&
//!FileUtil.fullyDelete(failedDir)
!deteteBlocks(failedDir)
@@ -137,8 +139,10 @@ public class TestDataNodeVolumeFailure {
// make sure a block report is sent
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
- long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs();
- cluster.getNameNode().blockReport(dn.dnRegistration, bReport);
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
+ long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
+ cluster.getNameNode().blockReport(dnR, bpid, bReport);
// verify number of blocks and files...
verify(filename, filesize);
@@ -216,7 +220,7 @@ public class TestDataNodeVolumeFailure {
for (LocatedBlock lb : locatedBlocks) {
DatanodeInfo dinfo = lb.getLocations()[1];
- Block b = lb.getBlock();
+ ExtendedBlock b = lb.getBlock();
try {
accessBlock(dinfo, lb);
} catch (IOException e) {
@@ -254,8 +258,7 @@ public class TestDataNodeVolumeFailure {
throws IOException {
InetSocketAddress targetAddr = null;
Socket s = null;
- BlockReader blockReader = null;
- Block block = lblock.getBlock();
+ ExtendedBlock block = lblock.getBlock();
targetAddr = NetUtils.createSocketAddr(datanode.getName());
@@ -263,8 +266,10 @@ public class TestDataNodeVolumeFailure {
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr, block.getBlockId());
- blockReader =
+ String file = BlockReader.getFileName(targetAddr,
+ "test-blockpoolid",
+ block.getBlockId());
+ BlockReader blockReader =
BlockReader.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, 4096);
@@ -314,9 +319,11 @@ public class TestDataNodeVolumeFailure {
*/
private int countRealBlocks(Map<String, BlockLocs> map) {
int total = 0;
+ final String bpid = cluster.getNamesystem().getBlockPoolId();
for(int i=0; i<dn_num; i++) {
- for(int j=1; j<=2; j++) {
- File dir = new File(dataDir, "data"+(2*i+j)+MiniDFSCluster.FINALIZED_DIR_NAME);
+ for(int j=0; j<=1; j++) {
+ File storageDir = MiniDFSCluster.getStorageDir(i, j);
+ File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
if(dir == null) {
System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
continue;
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java Fri Apr 29 18:16:32 2011
@@ -147,9 +147,9 @@ public class TestDataNodeVolumeFailureRe
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)3);
ArrayList<DataNode> dns = cluster.getDataNodes();
- assertTrue("DN1 should be up", DataNode.isDatanodeUp(dns.get(0)));
- assertTrue("DN2 should be up", DataNode.isDatanodeUp(dns.get(1)));
- assertTrue("DN3 should be up", DataNode.isDatanodeUp(dns.get(2)));
+ assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
+ assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
+ assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
/*
* The metrics should confirm the volume failures.
@@ -188,7 +188,7 @@ public class TestDataNodeVolumeFailureRe
Path file2 = new Path("/test2");
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file2, (short)3);
- assertTrue("DN3 should still be up", DataNode.isDatanodeUp(dns.get(2)));
+ assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
assertEquals("Vol3 should report 1 failure",
1, metrics3.volumesFailed.getCurrentIntervalValue());
live.clear();
@@ -233,7 +233,7 @@ public class TestDataNodeVolumeFailureRe
DFSTestUtil.createFile(fs, file3, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file3, (short)2);
// Eventually the DN should go down
- while (DataNode.isDatanodeUp(dns.get(2))) {
+ while (dns.get(2).isDatanodeUp()) {
Thread.sleep(1000);
}
// and report two failed volumes
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java Fri Apr 29 18:16:32 2011
@@ -98,7 +98,7 @@ public class TestDatanodeRestart {
out.write(writeBuf);
out.hflush();
DataNode dn = cluster.getDataNodes().get(0);
- for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
+ for (FSVolume volume : ((FSDataset)dn.data).volumes.getVolumes()) {
File currentDir = volume.getDir().getParentFile();
File rbwDir = new File(currentDir, "rbw");
for (File file : rbwDir.listFiles()) {
@@ -112,16 +112,17 @@ public class TestDatanodeRestart {
dn = cluster.getDataNodes().get(0);
// check volumeMap: one rwr replica
+ String bpid = cluster.getNamesystem().getBlockPoolId();
ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
- Assert.assertEquals(1, replicas.size());
- ReplicaInfo replica = replicas.replicas().iterator().next();
+ Assert.assertEquals(1, replicas.size(bpid));
+ ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
Assert.assertEquals(ReplicaState.RWR, replica.getState());
if (isCorrupt) {
Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
} else {
Assert.assertEquals(fileLen, replica.getNumBytes());
}
- dn.data.invalidate(new Block[]{replica});
+ dn.data.invalidate(bpid, new Block[]{replica});
} finally {
IOUtils.closeStream(out);
if (fs.exists(src)) {
@@ -146,9 +147,10 @@ public class TestDatanodeRestart {
DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
DFSTestUtil.waitReplication(fs, fileName, (short)1);
}
+ String bpid = cluster.getNamesystem().getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
Iterator<ReplicaInfo> replicasItor =
- ((FSDataset)dn.data).volumeMap.replicas().iterator();
+ ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
ReplicaInfo replica = replicasItor.next();
createUnlinkTmpFile(replica, true, true); // rename block file
createUnlinkTmpFile(replica, false, true); // rename meta file
@@ -165,7 +167,7 @@ public class TestDatanodeRestart {
// check volumeMap: 4 finalized replica
Collection<ReplicaInfo> replicas =
- ((FSDataset)(dn.data)).volumeMap.replicas();
+ ((FSDataset)(dn.data)).volumeMap.replicas(bpid);
Assert.assertEquals(4, replicas.size());
replicasItor = replicas.iterator();
while (replicasItor.hasNext()) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Fri Apr 29 18:16:32 2011
@@ -21,6 +21,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -48,6 +50,7 @@ public class TestDirectoryScanner extend
private static final int DEFAULT_GEN_STAMP = 9999;
private MiniDFSCluster cluster;
+ private String bpid;
private FSDataset fds = null;
private DirectoryScanner scanner = null;
private Random rand = new Random();
@@ -69,7 +72,7 @@ public class TestDirectoryScanner extend
/** Truncate a block file */
private long truncateBlockFile() throws IOException {
synchronized (fds) {
- for (ReplicaInfo b : fds.volumeMap.replicas()) {
+ for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
// Truncate a block file that has a corresponding metadata file
@@ -88,7 +91,7 @@ public class TestDirectoryScanner extend
/** Delete a block file */
private long deleteBlockFile() {
synchronized(fds) {
- for (ReplicaInfo b : fds.volumeMap.replicas()) {
+ for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
// Delete a block file that has corresponding metadata file
@@ -104,7 +107,7 @@ public class TestDirectoryScanner extend
/** Delete block meta file */
private long deleteMetaFile() {
synchronized(fds) {
- for (ReplicaInfo b : fds.volumeMap.replicas()) {
+ for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
File file = b.getMetaFile();
// Delete a metadata file
if (file.exists() && file.delete()) {
@@ -121,7 +124,7 @@ public class TestDirectoryScanner extend
long id = rand.nextLong();
while (true) {
id = rand.nextLong();
- if (fds.fetchReplicaInfo(id) == null) {
+ if (fds.fetchReplicaInfo(bpid, id) == null) {
break;
}
}
@@ -139,10 +142,11 @@ public class TestDirectoryScanner extend
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
- FSVolume[] volumes = fds.volumes.volumes;
- int index = rand.nextInt(volumes.length - 1);
+ List<FSVolume> volumes = fds.volumes.getVolumes();
+ int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
- File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+ File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+ File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
}
@@ -151,10 +155,11 @@ public class TestDirectoryScanner extend
/** Create a metafile in a random volume*/
private long createMetaFile() throws IOException {
- FSVolume[] volumes = fds.volumes.volumes;
- int index = rand.nextInt(volumes.length - 1);
+ List<FSVolume> volumes = fds.volumes.getVolumes();
+ int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
- File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+ File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+ File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
@@ -163,10 +168,11 @@ public class TestDirectoryScanner extend
/** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException {
- FSVolume[] volumes = fds.volumes.volumes;
- int index = rand.nextInt(volumes.length - 1);
+ List<FSVolume> volumes = fds.volumes.getVolumes();
+ int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
- File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+ File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+ File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@@ -185,7 +191,7 @@ public class TestDirectoryScanner extend
LOG.info("Created extraneous file " + name2);
}
- file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+ file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
@@ -196,12 +202,18 @@ public class TestDirectoryScanner extend
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
long missingMemoryBlocks, long mismatchBlocks) {
scanner.reconcile();
- assertEquals(totalBlocks, scanner.totalBlocks);
- assertEquals(diffsize, scanner.diff.size());
- assertEquals(missingMetaFile, scanner.missingMetaFile);
- assertEquals(missingBlockFile, scanner.missingBlockFile);
- assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks);
- assertEquals(mismatchBlocks, scanner.mismatchBlocks);
+
+ assertTrue(scanner.diffs.containsKey(bpid));
+ LinkedList<DirectoryScanner.ScanInfo> diff = scanner.diffs.get(bpid);
+ assertTrue(scanner.stats.containsKey(bpid));
+ DirectoryScanner.Stats stats = scanner.stats.get(bpid);
+
+ assertEquals(diffsize, diff.size());
+ assertEquals(totalBlocks, stats.totalBlocks);
+ assertEquals(missingMetaFile, stats.missingMetaFile);
+ assertEquals(missingBlockFile, stats.missingBlockFile);
+ assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
+ assertEquals(mismatchBlocks, stats.mismatchBlocks);
}
public void testDirectoryScanner() throws Exception {
@@ -215,10 +227,12 @@ public class TestDirectoryScanner extend
cluster = new MiniDFSCluster.Builder(CONF).build();
try {
cluster.waitActive();
+ bpid = cluster.getNamesystem().getBlockPoolId();
fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
scanner = new DirectoryScanner(fds, CONF);
+ scanner.setRetainDiffs(true);
// Add files with 100 blocks
createFile("/tmp/t1", 10000);
@@ -318,19 +332,26 @@ public class TestDirectoryScanner extend
truncateBlockFile();
scan(totalBlocks+3, 6, 2, 2, 3, 2);
scan(totalBlocks+1, 0, 0, 0, 0, 0);
+
+ // Test14: validate clean shutdown of DirectoryScanner
+ ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
+ scanner.shutdown();
+ assertFalse(scanner.getRunStatus());
+
} finally {
+ scanner.shutdown();
cluster.shutdown();
}
}
private void verifyAddition(long blockId, long genStamp, long size) {
final ReplicaInfo replicainfo;
- replicainfo = fds.fetchReplicaInfo(blockId);
+ replicainfo = fds.fetchReplicaInfo(bpid, blockId);
assertNotNull(replicainfo);
// Added block has the same file as the one created by the test
File file = new File(getBlockFile(blockId));
- assertEquals(file.getName(), fds.findBlockFile(blockId).getName());
+ assertEquals(file.getName(), fds.findBlockFile(bpid, blockId).getName());
// Generation stamp is same as that of created file
assertEquals(genStamp, replicainfo.getGenerationStamp());
@@ -341,12 +362,12 @@ public class TestDirectoryScanner extend
private void verifyDeletion(long blockId) {
// Ensure block does not exist in memory
- assertNull(fds.fetchReplicaInfo(blockId));
+ assertNull(fds.fetchReplicaInfo(bpid, blockId));
}
private void verifyGenStamp(long blockId, long genStamp) {
final ReplicaInfo memBlock;
- memBlock = fds.fetchReplicaInfo(blockId);
+ memBlock = fds.fetchReplicaInfo(bpid, blockId);
assertNotNull(memBlock);
assertEquals(genStamp, memBlock.getGenerationStamp());
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Fri Apr 29 18:16:32 2011
@@ -51,7 +51,6 @@ public class TestDiskError {
private FileSystem fs;
private MiniDFSCluster cluster;
private Configuration conf;
- private String dataDir;
@Before
public void setUp() throws Exception {
@@ -60,7 +59,6 @@ public class TestDiskError {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
- dataDir = cluster.getDataDirectory();
}
@After
@@ -86,8 +84,11 @@ public class TestDiskError {
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
final int dnIndex = 0;
- File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "current/rbw");
- File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "current/rbw");
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ File storageDir = MiniDFSCluster.getStorageDir(dnIndex, 0);
+ File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+ storageDir = MiniDFSCluster.getStorageDir(dnIndex, 1);
+ File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
try {
// make the data directory of the first datanode to be readonly
assertTrue("Couldn't chmod local vol", dir1.setReadOnly());
@@ -95,7 +96,7 @@ public class TestDiskError {
// create files and make sure that first datanode will be down
DataNode dn = cluster.getDataNodes().get(dnIndex);
- for (int i=0; DataNode.isDatanodeUp(dn); i++) {
+ for (int i=0; dn.isDatanodeUp(); i++) {
Path fileName = new Path("/test.txt"+i);
DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
DFSTestUtil.waitReplication(fs, fileName, (short)2);
@@ -152,9 +153,11 @@ public class TestDiskError {
out.close();
// the temporary block & meta files should be deleted
- String dataDir = cluster.getDataDirectory();
- File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "current/rbw");
- File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "current/rbw");
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ File storageDir = MiniDFSCluster.getStorageDir(sndNode, 0);
+ File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+ storageDir = MiniDFSCluster.getStorageDir(sndNode, 1);
+ File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
Thread.sleep(100);
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Fri Apr 29 18:16:32 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -46,8 +47,8 @@ import org.junit.Test;
* This tests InterDataNodeProtocol for block handling.
*/
public class TestInterDatanodeProtocol {
- public static void checkMetaInfo(Block b, DataNode dn) throws IOException {
- Block metainfo = dn.data.getStoredBlock(b.getBlockId());
+ public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
+ Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes());
}
@@ -97,10 +98,12 @@ public class TestInterDatanodeProtocol {
assertTrue(datanode != null);
//stop block scanner, so we could compare lastScanTime
- datanode.blockScannerThread.interrupt();
+ if (datanode.blockScanner != null) {
+ datanode.blockScanner.shutdown();
+ }
//verify BlockMetaDataInfo
- Block b = locatedblock.getBlock();
+ ExtendedBlock b = locatedblock.getBlock();
InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
checkMetaInfo(b, datanode);
long recoveryId = b.getGenerationStamp() + 1;
@@ -108,7 +111,7 @@ public class TestInterDatanodeProtocol {
new RecoveringBlock(b, locatedblock.getLocations(), recoveryId));
//verify updateBlock
- Block newblock = new Block(
+ ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(),
b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes());
checkMetaInfo(newblock, datanode);
@@ -129,44 +132,47 @@ public class TestInterDatanodeProtocol {
Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState());
}
- /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */
+ /** Test
+ * {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)}
+ */
@Test
public void testInitReplicaRecovery() throws IOException {
final long firstblockid = 10000L;
final long gs = 7777L;
final long length = 22L;
- final ReplicasMap map = new ReplicasMap();
+ final ReplicasMap map = new ReplicasMap(this);
+ String bpid = "BP-TEST";
final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) {
blocks[i] = new Block(firstblockid + i, length, gs);
- map.add(createReplicaInfo(blocks[i]));
+ map.add(bpid, createReplicaInfo(blocks[i]));
}
{
//normal case
final Block b = blocks[0];
- final ReplicaInfo originalInfo = map.get(b);
+ final ReplicaInfo originalInfo = map.get(bpid, b);
final long recoveryid = gs + 1;
- final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid);
+ final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid);
assertEquals(originalInfo, recoveryInfo);
- final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b);
+ final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId());
Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID());
//recover one more time
final long recoveryid2 = gs + 2;
- final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2);
+ final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
assertEquals(originalInfo, recoveryInfo2);
- final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b);
+ final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId());
Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID());
//case RecoveryInProgressException
try {
- FSDataset.initReplicaRecovery(map, b, recoveryid);
+ FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
Assert.fail();
}
catch(RecoveryInProgressException ripe) {
@@ -177,7 +183,7 @@ public class TestInterDatanodeProtocol {
{ // BlockRecoveryFI_01: replica not found
final long recoveryid = gs + 1;
final Block b = new Block(firstblockid - 1, length, gs);
- ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(map, b, recoveryid);
+ ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
Assert.assertNull("Data-node should not have this replica.", r);
}
@@ -185,7 +191,7 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs - 1;
final Block b = new Block(firstblockid + 1, length, gs);
try {
- FSDataset.initReplicaRecovery(map, b, recoveryid);
+ FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
Assert.fail();
}
catch(IOException ioe) {
@@ -198,7 +204,7 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs + 1;
final Block b = new Block(firstblockid, length, gs+1);
try {
- FSDataset.initReplicaRecovery(map, b, recoveryid);
+ FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
fail("InitReplicaRecovery should fail because replica's " +
"gs is less than the block's gs");
} catch (IOException e) {
@@ -208,7 +214,10 @@ public class TestInterDatanodeProtocol {
}
}
- /** Test {@link FSDataset#updateReplicaUnderRecovery(ReplicaUnderRecovery, long, long)} */
+ /**
+ * Test for
+ * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)}
+ * */
@Test
public void testUpdateReplicaUnderRecovery() throws IOException {
final Configuration conf = new HdfsConfiguration();
@@ -217,6 +226,7 @@ public class TestInterDatanodeProtocol {
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
+ String bpid = cluster.getNamesystem().getBlockPoolId();
//create a file
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
@@ -237,14 +247,14 @@ public class TestInterDatanodeProtocol {
final FSDataset fsdataset = (FSDataset)datanode.data;
//initReplicaRecovery
- final Block b = locatedblock.getBlock();
+ final ExtendedBlock b = locatedblock.getBlock();
final long recoveryid = b.getGenerationStamp() + 1;
final long newlength = b.getNumBytes() - 1;
final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery(
new RecoveringBlock(b, null, recoveryid));
//check replica
- final ReplicaInfo replica = fsdataset.fetchReplicaInfo(b.getBlockId());
+ final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId());
Assert.assertEquals(ReplicaState.RUR, replica.getState());
//check meta data before update
@@ -254,8 +264,8 @@ public class TestInterDatanodeProtocol {
//with (block length) != (stored replica's on disk length).
{
//create a block with same id and gs but different length.
- final Block tmp = new Block(rri.getBlockId(), rri.getNumBytes() - 1,
- rri.getGenerationStamp());
+ final ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri
+ .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp());
try {
//update should fail
fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength);
@@ -267,7 +277,7 @@ public class TestInterDatanodeProtocol {
//update
final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery(
- rri, recoveryid, newlength);
+ new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
//check meta data after update
FSDataset.checkReplicaFiles(finalized);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java Fri Apr 29 18:16:32 2011
@@ -26,12 +26,13 @@ import org.junit.Test;
* Unit test for ReplicasMap class
*/
public class TestReplicasMap {
- private static final ReplicasMap map = new ReplicasMap();
+ private static final ReplicasMap map = new ReplicasMap(TestReplicasMap.class);
+ private static final String bpid = "BP-TEST";
private static final Block block = new Block(1234, 1234, 1234);
@BeforeClass
public static void setup() {
- map.add(new FinalizedReplica(block, null, null));
+ map.add(bpid, new FinalizedReplica(block, null, null));
}
/**
@@ -41,35 +42,35 @@ public class TestReplicasMap {
public void testGet() {
// Test 1: null argument throws invalid argument exception
try {
- map.get(null);
+ map.get(bpid, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException expected) { }
// Test 2: successful lookup based on block
- assertNotNull(map.get(block));
+ assertNotNull(map.get(bpid, block));
// Test 3: Lookup failure - generation stamp mismatch
Block b = new Block(block);
b.setGenerationStamp(0);
- assertNull(map.get(b));
+ assertNull(map.get(bpid, b));
// Test 4: Lookup failure - blockID mismatch
b.setGenerationStamp(block.getGenerationStamp());
b.setBlockId(0);
- assertNull(map.get(b));
+ assertNull(map.get(bpid, b));
// Test 5: successful lookup based on block ID
- assertNotNull(map.get(block.getBlockId()));
+ assertNotNull(map.get(bpid, block.getBlockId()));
// Test 6: failed lookup for invalid block ID
- assertNull(map.get(0));
+ assertNull(map.get(bpid, 0));
}
@Test
public void testAdd() {
// Test 1: null argument throws invalid argument exception
try {
- map.add(null);
+ map.add(bpid, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException expected) { }
}
@@ -78,28 +79,28 @@ public class TestReplicasMap {
public void testRemove() {
// Test 1: null argument throws invalid argument exception
try {
- map.remove(null);
+ map.remove(bpid, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException expected) { }
// Test 2: remove failure - generation stamp mismatch
Block b = new Block(block);
b.setGenerationStamp(0);
- assertNull(map.remove(b));
+ assertNull(map.remove(bpid, b));
// Test 3: remove failure - blockID mismatch
b.setGenerationStamp(block.getGenerationStamp());
b.setBlockId(0);
- assertNull(map.remove(b));
+ assertNull(map.remove(bpid, b));
// Test 4: remove success
- assertNotNull(map.remove(block));
+ assertNotNull(map.remove(bpid, block));
// Test 5: remove failure - invalid blockID
- assertNull(map.remove(0));
+ assertNull(map.remove(bpid, 0));
// Test 6: remove success
- map.add(new FinalizedReplica(block, null, null));
- assertNotNull(map.remove(block.getBlockId()));
+ map.add(bpid, new FinalizedReplica(block, null, null));
+ assertNotNull(map.remove(bpid, block.getBlockId()));
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java Fri Apr 29 18:16:32 2011
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import junit.framework.Assert;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.util.ReflectionUtils;
@@ -29,28 +32,28 @@ public class TestRoundRobinVolumesPolicy
// Test the Round-Robin block-volume choosing algorithm.
@Test
public void testRR() throws Exception {
- FSVolume[] volumes = new FSVolume[2];
+ final List<FSVolume> volumes = new ArrayList<FSVolume>();
// First volume, with 100 bytes of space.
- volumes[0] = Mockito.mock(FSVolume.class);
- Mockito.when(volumes[0].getAvailable()).thenReturn(100L);
+ volumes.add(Mockito.mock(FSVolume.class));
+ Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
// Second volume, with 200 bytes of space.
- volumes[1] = Mockito.mock(FSVolume.class);
- Mockito.when(volumes[1].getAvailable()).thenReturn(200L);
+ volumes.add(Mockito.mock(FSVolume.class));
+ Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
RoundRobinVolumesPolicy.class, null);
// Test two rounds of round-robin choosing
- Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
// The first volume has only 100L space, so the policy should
// wisely choose the second one in case we ask for more.
- Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 150));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
// Fail if no volume can be chosen?
try {