You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/09/18 17:12:53 UTC
svn commit: r1524444 [2/3] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/or...
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 18 15:12:52 2013
@@ -141,6 +141,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -2495,14 +2496,8 @@ public class FSNamesystem implements Nam
}
// choose targets for the new block to be allocated.
- // TODO: chooseTarget(..) should be changed to return DatanodeStorageInfo's
- final DatanodeDescriptor chosenDatanodes[] = getBlockManager().chooseTarget(
+ final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
- final DatanodeStorageInfo[] targets = new DatanodeStorageInfo[chosenDatanodes.length];
- for(int i = 0; i < targets.length; i++) {
- final DatanodeDescriptor dd = chosenDatanodes[i];
- targets[i] = dd.getStorageInfos().iterator().next();
- }
// Part II.
// Allocate a new block, add it to the INode and the BlocksMap.
@@ -2644,7 +2639,7 @@ public class FSNamesystem implements Nam
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
long offset) throws IOException {
- LocatedBlock lBlk = LocatedBlock.createLocatedBlock(
+ LocatedBlock lBlk = new LocatedBlock(
getExtendedBlock(blk), locs, offset, false);
getBlockManager().setBlockToken(
lBlk, BlockTokenSecretManager.AccessMode.WRITE);
@@ -2653,7 +2648,8 @@ public class FSNamesystem implements Nam
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
- final DatanodeInfo[] existings, final Set<Node> excludes,
+ final DatanodeInfo[] existings, final String[] storageIDs,
+ final Set<Node> excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
//check if the feature is enabled
@@ -2661,7 +2657,7 @@ public class FSNamesystem implements Nam
final DatanodeDescriptor clientnode;
final long preferredblocksize;
- final List<DatanodeDescriptor> chosen;
+ final List<DatanodeStorageInfo> chosen;
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
@@ -2679,23 +2675,18 @@ public class FSNamesystem implements Nam
clientnode = file.getClientNode();
preferredblocksize = file.getPreferredBlockSize();
- //find datanode descriptors
- chosen = new ArrayList<DatanodeDescriptor>();
- for(DatanodeInfo d : existings) {
- final DatanodeDescriptor descriptor = blockManager.getDatanodeManager(
- ).getDatanode(d);
- if (descriptor != null) {
- chosen.add(descriptor);
- }
- }
+ //find datanode storages
+ final DatanodeManager dm = blockManager.getDatanodeManager();
+ chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
} finally {
readUnlock();
}
// choose new datanodes.
- final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
+ final DatanodeStorageInfo[] targets = blockManager.getBlockPlacementPolicy(
).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
- excludes, preferredblocksize);
+ // TODO: get storage type from the file
+ excludes, preferredblocksize, StorageType.DEFAULT);
final LocatedBlock lb = new LocatedBlock(blk, targets);
blockManager.setBlockToken(lb, AccessMode.COPY);
return lb;
@@ -5634,9 +5625,9 @@ public class FSNamesystem implements Nam
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
+ String[] storageIDs = blocks[i].getStorageIDs();
for (int j = 0; j < nodes.length; j++) {
- //TODO: add "storageID to LocatedBlock
- blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], "STORAGE_ID",
+ blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], storageIDs[j],
"client machine reported it");
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Sep 18 15:12:52 2013
@@ -566,7 +566,8 @@ class NameNodeRpcServer implements Namen
@Override // ClientProtocol
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
- final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final DatanodeInfo[] existings, final String[] existingStorageIDs,
+ final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
if (LOG.isDebugEnabled()) {
@@ -587,8 +588,8 @@ class NameNodeRpcServer implements Namen
excludeSet.add(node);
}
}
- return namesystem.getAdditionalDatanode(src, blk,
- existings, excludeSet, numAdditionalNodes, clientName);
+ return namesystem.getAdditionalDatanode(src, blk, existings,
+ existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
}
/**
* The client needs to give up on the block.
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Wed Sep 18 15:12:52 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -158,7 +160,7 @@ public class NamenodeWebHdfsMethods {
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize, Configuration conf) throws IOException {
+ final long blocksize, final Configuration conf) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
if (op == PutOpParam.Op.CREATE) {
@@ -166,11 +168,13 @@ public class NamenodeWebHdfsMethods {
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(getRemoteAddress());
if (clientNode != null) {
- final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy()
+ final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
- new ArrayList<DatanodeDescriptor>(), false, null, blocksize);
- if (datanodes.length > 0) {
- return datanodes[0];
+ new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
+ // TODO: get storage type from the file
+ StorageType.DEFAULT);
+ if (storages.length > 0) {
+ return storages[0].getDatanodeDescriptor();
}
}
} else if (op == GetOpParam.Op.OPEN
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Wed Sep 18 15:12:52 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
/****************************************************
* A BlockCommand is an instruction to a datanode
@@ -46,9 +47,10 @@ public class BlockCommand extends Datano
*/
public static final long NO_ACK = Long.MAX_VALUE;
- String poolId;
- Block blocks[];
- DatanodeInfo targets[][];
+ final String poolId;
+ final Block[] blocks;
+ final DatanodeInfo[][] targets;
+ final String[][] targetStorageIDs;
/**
* Create BlockCommand for transferring blocks to another datanode
@@ -60,21 +62,26 @@ public class BlockCommand extends Datano
this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
+ targetStorageIDs = new String[blocks.length][];
+
for(int i = 0; i < blocks.length; i++) {
BlockTargetPair p = blocktargetlist.get(i);
blocks[i] = p.block;
- targets[i] = p.targets;
+ targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
+ targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
}
}
- private static final DatanodeInfo[][] EMPTY_TARGET = {};
+ private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
+ private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
/**
* Create BlockCommand for the given action
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block blocks[]) {
- this(action, poolId, blocks, EMPTY_TARGET);
+ this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
+ EMPTY_TARGET_STORAGEIDS);
}
/**
@@ -82,11 +89,12 @@ public class BlockCommand extends Datano
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block[] blocks,
- DatanodeInfo[][] targets) {
+ DatanodeInfo[][] targets, String[][] targetStorageIDs) {
super(action);
this.poolId = poolId;
this.blocks = blocks;
this.targets = targets;
+ this.targetStorageIDs = targetStorageIDs;
}
public String getBlockPoolId() {
@@ -100,4 +108,8 @@ public class BlockCommand extends Datano
public DatanodeInfo[][] getTargets() {
return targets;
}
+
+ public String[][] getTargetStorageIDs() {
+ return targetStorageIDs;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Wed Sep 18 15:12:52 2013
@@ -141,6 +141,7 @@ message GetAdditionalDatanodeRequestProt
repeated DatanodeInfoProto excludes = 4;
required uint32 numAdditionalNodes = 5;
required string clientName = 6;
+ repeated string existingStorageIDs = 7;
}
message GetAdditionalDatanodeResponseProto {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Wed Sep 18 15:12:52 2013
@@ -105,10 +105,12 @@ message BlockCommandProto {
INVALIDATE = 2; // Invalidate blocks
SHUTDOWN = 3; // Shutdown the datanode
}
+
required Action action = 1;
required string blockPoolId = 2;
repeated BlockProto blocks = 3;
repeated DatanodeInfosProto targets = 4;
+ repeated StorageIDsProto targetStorageIDs = 5;
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Wed Sep 18 15:12:52 2013
@@ -122,6 +122,13 @@ enum StorageTypeProto {
}
/**
+ * A list of storage IDs.
+ */
+message StorageIDsProto {
+ repeated string storageIDs = 1;
+}
+
+/**
* A LocatedBlock gives information about a block and its location.
*/
message LocatedBlockProto {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Sep 18 15:12:52 2013
@@ -860,9 +860,35 @@ public class DFSTestUtil {
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip) {
+ return createDatanodeStorageInfo(storageID, ip, "defaultRack");
+ }
+ public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) {
+ return createDatanodeStorageInfos(racks.length, racks);
+ }
+ public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) {
+ DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
+ for(int i = storages.length; i > 0; ) {
+ final String storageID = "s" + i;
+ final String ip = i + "." + i + "." + i + "." + i;
+ i--;
+ final String rack = i < racks.length? racks[i]: "defaultRack";
+ storages[i] = createDatanodeStorageInfo(storageID, ip, rack);
+ }
+ return storages;
+ }
+ public static DatanodeStorageInfo createDatanodeStorageInfo(
+ String storageID, String ip, String rack) {
+ final DatanodeStorage storage = new DatanodeStorage(storageID);
return new DatanodeStorageInfo(
- getDatanodeDescriptor(ip, "defaultRack"),
- new DatanodeStorage(storageID));
+ BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage), storage);
+ }
+ public static DatanodeDescriptor[] toDatanodeDescriptor(
+ DatanodeStorageInfo[] storages) {
+ DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];
+ for(int i = 0; i < datanodes.length; i++) {
+ datanodes[i] = storages[i].getDatanodeDescriptor();
+ }
+ return datanodes;
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Wed Sep 18 15:12:52 2013
@@ -539,8 +539,9 @@ public class TestPBHelper {
dnInfos[0][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
+ String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
- blocks, dnInfos);
+ blocks, dnInfos, storageIDs);
BlockCommandProto bcProto = PBHelper.convert(bc);
BlockCommand bc2 = PBHelper.convert(bcProto);
assertEquals(bc.getAction(), bc2.getAction());
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Wed Sep 18 15:12:52 2013
@@ -227,10 +227,16 @@ public class BlockManagerTestUtil {
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, boolean initializeStorage) {
+ return getDatanodeDescriptor(ipAddr, rackLocation,
+ initializeStorage? new DatanodeStorage(DatanodeStorage.newStorageID()): null);
+ }
+
+ public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+ String rackLocation, DatanodeStorage storage) {
DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
- if (initializeStorage) {
- dn.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
+ if (storage != null) {
+ dn.updateStorage(storage);
}
return dn;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Wed Sep 18 15:12:52 2013
@@ -22,10 +22,14 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@@ -45,7 +49,6 @@ import org.apache.hadoop.net.NetworkTopo
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import static org.mockito.Mockito.*;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
@@ -53,6 +56,7 @@ import com.google.common.collect.LinkedL
import com.google.common.collect.Lists;
public class TestBlockManager {
+ private DatanodeStorageInfo[] storages;
private List<DatanodeDescriptor> nodes;
private List<DatanodeDescriptor> rackA;
private List<DatanodeDescriptor> rackB;
@@ -81,14 +85,15 @@ public class TestBlockManager {
fsn = Mockito.mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock();
bm = new BlockManager(fsn, fsn, conf);
- nodes = ImmutableList.of(
- BlockManagerTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA", true),
- BlockManagerTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA", true),
- BlockManagerTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA", true),
- BlockManagerTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackB", true),
- BlockManagerTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackB", true),
- BlockManagerTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackB", true)
- );
+ final String[] racks = {
+ "/rackA",
+ "/rackA",
+ "/rackA",
+ "/rackB",
+ "/rackB",
+ "/rackB"};
+ storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+ nodes = Arrays.asList(DFSTestUtil.toDatanodeDescriptor(storages));
rackA = nodes.subList(0, 3);
rackB = nodes.subList(3, 6);
}
@@ -125,17 +130,18 @@ public class TestBlockManager {
}
private void doBasicTest(int testIndex) {
- List<DatanodeDescriptor> origNodes = getNodes(0, 1);
+ List<DatanodeStorageInfo> origStorages = getStorages(0, 1);
+ List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
- DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+ DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline.length);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
- origNodes.contains(pipeline[0]));
+ origStorages.contains(pipeline[0]));
assertTrue("Destination of replication should be on the other rack. " +
"Was: " + pipeline[1],
- rackB.contains(pipeline[1]));
+ rackB.contains(pipeline[1].getDatanodeDescriptor()));
}
@@ -156,21 +162,22 @@ public class TestBlockManager {
private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception {
// Block originally on A1, A2, B1
- List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+ List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+ List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
// Decommission two of the nodes (A1, A2)
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1);
- DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+ DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
- origNodes.contains(pipeline[0]));
+ origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 3, pipeline.length);
boolean foundOneOnRackA = false;
for (int i = 1; i < pipeline.length; i++) {
- DatanodeDescriptor target = pipeline[i];
+ DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
if (rackA.contains(target)) {
foundOneOnRackA = true;
}
@@ -199,22 +206,23 @@ public class TestBlockManager {
private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception {
// Block originally on A1, A2, B1
- List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+ List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+ List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
// Decommission all of the nodes
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 3);
- DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+ DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
- origNodes.contains(pipeline[0]));
+ origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 4, pipeline.length);
boolean foundOneOnRackA = false;
boolean foundOneOnRackB = false;
for (int i = 1; i < pipeline.length; i++) {
- DatanodeDescriptor target = pipeline[i];
+ DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
if (rackA.contains(target)) {
foundOneOnRackA = true;
} else if (rackB.contains(target)) {
@@ -251,21 +259,22 @@ public class TestBlockManager {
private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception {
// Block originally on A1, A2, B1
- List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+ List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+ List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
// Decommission all of the nodes in rack A
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 2);
- DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+ DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
- origNodes.contains(pipeline[0]));
+ origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 3, pipeline.length);
boolean foundOneOnRackB = false;
for (int i = 1; i < pipeline.length; i++) {
- DatanodeDescriptor target = pipeline[i];
+ DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
if (rackB.contains(target)) {
foundOneOnRackB = true;
}
@@ -287,9 +296,9 @@ public class TestBlockManager {
rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
addNodes(ImmutableList.of(rackCNode));
try {
- DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
+ DatanodeStorageInfo[] pipeline2 = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline2.length);
- assertEquals(rackCNode, pipeline2[1]);
+ assertEquals(rackCNode, pipeline2[1].getDatanodeDescriptor());
} finally {
removeNode(rackCNode);
}
@@ -311,15 +320,15 @@ public class TestBlockManager {
// Originally on only nodes in rack A.
List<DatanodeDescriptor> origNodes = rackA;
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
- DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
+ DatanodeStorageInfo pipeline[] = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline.length); // single new copy
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
- origNodes.contains(pipeline[0]));
+ origNodes.contains(pipeline[0].getDatanodeDescriptor()));
assertTrue("Destination of replication should be on the other rack. " +
"Was: " + pipeline[1],
- rackB.contains(pipeline[1]));
+ rackB.contains(pipeline[1].getDatanodeDescriptor()));
}
@Test
@@ -354,19 +363,11 @@ public class TestBlockManager {
* pipeline.
*/
private void fulfillPipeline(BlockInfo blockInfo,
- DatanodeDescriptor[] pipeline) throws IOException {
+ DatanodeStorageInfo[] pipeline) throws IOException {
for (int i = 1; i < pipeline.length; i++) {
- DatanodeDescriptor dn = pipeline[i];
-
- Iterator<DatanodeStorageInfo> iterator = dn.getStorageInfos().iterator();
- if (iterator.hasNext()) {
- DatanodeStorageInfo storage = iterator.next();
- bm.addBlock(dn, storage.getStorageID(), blockInfo, null);
- blockInfo.addStorage(storage);
- } else {
- throw new RuntimeException("Storage info on node: " + dn.getHostName()
- + " is invalid.");
- }
+ DatanodeStorageInfo storage = pipeline[i];
+ bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+ blockInfo.addStorage(storage);
}
}
@@ -389,6 +390,22 @@ public class TestBlockManager {
}
return ret;
}
+
+ private List<DatanodeDescriptor> getNodes(List<DatanodeStorageInfo> storages) {
+ List<DatanodeDescriptor> ret = Lists.newArrayList();
+ for (DatanodeStorageInfo s : storages) {
+ ret.add(s.getDatanodeDescriptor());
+ }
+ return ret;
+ }
+
+ private List<DatanodeStorageInfo> getStorages(int ... indexes) {
+ List<DatanodeStorageInfo> ret = Lists.newArrayList();
+ for (int idx : indexes) {
+ ret.add(storages[idx]);
+ }
+ return ret;
+ }
private List<DatanodeDescriptor> startDecommission(int ... indexes) {
List<DatanodeDescriptor> nodes = getNodes(indexes);
@@ -407,7 +424,7 @@ public class TestBlockManager {
return blockInfo;
}
- private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
+ private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
// list for priority 1
List<Block> list_p1 = new ArrayList<Block>();
list_p1.add(block);
@@ -425,27 +442,29 @@ public class TestBlockManager {
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);
- LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
+ LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
assertEquals(1, repls.size());
- Entry<DatanodeDescriptor, BlockTargetPair> repl =
+ Entry<DatanodeStorageInfo, BlockTargetPair> repl =
repls.entries().iterator().next();
- DatanodeDescriptor[] targets = repl.getValue().targets;
+ DatanodeStorageInfo[] targets = repl.getValue().targets;
- DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
+ DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
pipeline[0] = repl.getKey();
System.arraycopy(targets, 0, pipeline, 1, targets.length);
return pipeline;
}
- private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
- LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
+ private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
+ LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
LinkedListMultimap.create();
for (DatanodeDescriptor dn : nodes) {
List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
if (thisRepls != null) {
- repls.putAll(dn, thisRepls);
+ for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
+ repls.putAll(storage, thisRepls);
+ }
}
}
return repls;
@@ -468,7 +487,7 @@ public class TestBlockManager {
addBlockOnNodes(blockId,origNodes.subList(0,1));
List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
- List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
assertNotNull("Chooses source node for a highest-priority replication"
+ " even if all available source nodes have reached their replication"
@@ -491,7 +510,7 @@ public class TestBlockManager {
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
// Increase the replication count to test replication count > hard limit
- DatanodeDescriptor targets[] = { origNodes.get(1) };
+ DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
assertNull("Does not choose a source node for a highest-priority"
@@ -507,8 +526,7 @@ public class TestBlockManager {
@Test
public void testSafeModeIBR() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
- Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
- DatanodeStorageInfo ds = i.next();
+ DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.setStorageID(ds.getStorageID());
node.isAlive = true;
@@ -552,8 +570,7 @@ public class TestBlockManager {
@Test
public void testSafeModeIBRAfterIncremental() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
- Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
- DatanodeStorageInfo ds = i.next();
+ DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.setStorageID(ds.getStorageID());
node.isAlive = true;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Wed Sep 18 15:12:52 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
-import java.util.Iterator;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -60,9 +59,9 @@ public class TestDatanodeDescriptor {
assertEquals(0, dd.numBlocks());
BlockInfo blk = new BlockInfo(new Block(1L), 1);
BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
- Iterator<DatanodeStorageInfo> iterator = dd.getStorageInfos().iterator();
- assertTrue(iterator.hasNext());
- final String storageID = iterator.next().getStorageID();
+ DatanodeStorageInfo[] storages = dd.getStorageInfos();
+ assertTrue(storages.length > 0);
+ final String storageID = storages[0].getStorageID();
// add first block
assertTrue(dd.addBlock(storageID, blk));
assertEquals(1, dd.numBlocks());
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Wed Sep 18 15:12:52 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Test;
/**
@@ -62,6 +63,8 @@ public class TestHeartbeatHandling {
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
+ final String storageID = DatanodeStorage.newStorageID();
+ dd.updateStorage(new DatanodeStorage(storageID));
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =
@@ -69,7 +72,7 @@ public class TestHeartbeatHandling {
final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
- final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
+ final DatanodeStorageInfo[] ONE_TARGET = {dd.getStorageInfo(storageID)};
try {
namesystem.writeLock();
@@ -143,12 +146,15 @@ public class TestHeartbeatHandling {
final DatanodeRegistration nodeReg1 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
+ dd1.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
final DatanodeRegistration nodeReg2 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
+ dd2.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
final DatanodeRegistration nodeReg3 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
+ dd3.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
try {
namesystem.writeLock();
@@ -162,9 +168,9 @@ public class TestHeartbeatHandling {
dd2.setLastUpdate(System.currentTimeMillis());
dd3.setLastUpdate(System.currentTimeMillis());
final DatanodeStorageInfo[] storages = {
- dd1.getStorageInfos().iterator().next(),
- dd2.getStorageInfos().iterator().next(),
- dd3.getStorageInfos().iterator().next()};
+ dd1.getStorageInfos()[0],
+ dd2.getStorageInfos()[0],
+ dd3.getStorageInfos()[0]};
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY, storages);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Wed Sep 18 15:12:52 2013
@@ -43,8 +43,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Test;
-import com.google.common.base.Preconditions;
-
/**
* This class tests the internals of PendingReplicationBlocks.java,
* as well as how PendingReplicationBlocks acts in BlockManager
@@ -54,22 +52,7 @@ public class TestPendingReplication {
private static final int DFS_REPLICATION_INTERVAL = 1;
// Number of datanodes in the cluster
private static final int DATANODE_COUNT = 5;
-
- private DatanodeDescriptor genDatanodeId(int seed) {
- seed = seed % 256;
- String ip = seed + "." + seed + "." + seed + "." + seed;
- return DFSTestUtil.getDatanodeDescriptor(ip, null);
- }
- private DatanodeDescriptor[] genDatanodes(int number) {
- Preconditions.checkArgument(number >= 0);
- DatanodeDescriptor[] nodes = new DatanodeDescriptor[number];
- for (int i = 0; i < number; i++) {
- nodes[i] = genDatanodeId(i);
- }
- return nodes;
- }
-
@Test
public void testPendingReplication() {
PendingReplicationBlocks pendingReplications;
@@ -79,9 +62,12 @@ public class TestPendingReplication {
//
// Add 10 blocks to pendingReplications.
//
- for (int i = 0; i < 10; i++) {
+ DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
+ for (int i = 0; i < storages.length; i++) {
Block block = new Block(i, i, 0);
- pendingReplications.increment(block, genDatanodes(i));
+ DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
+ System.arraycopy(storages, 0, targets, 0, i);
+ pendingReplications.increment(block, targets);
}
assertEquals("Size of pendingReplications ",
10, pendingReplications.size());
@@ -91,16 +77,18 @@ public class TestPendingReplication {
// remove one item and reinsert it
//
Block blk = new Block(8, 8, 0);
- pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica
+ pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor(),
+ storages[7].getStorageID()); // removes one replica
assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk));
for (int i = 0; i < 7; i++) {
// removes all replicas
- pendingReplications.decrement(blk, genDatanodeId(i));
+ pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor(),
+ storages[i].getStorageID());
}
assertTrue(pendingReplications.size() == 9);
- pendingReplications.increment(blk, genDatanodes(8));
+ pendingReplications.increment(blk, DFSTestUtil.createDatanodeStorageInfos(8));
assertTrue(pendingReplications.size() == 10);
//
@@ -128,7 +116,7 @@ public class TestPendingReplication {
for (int i = 10; i < 15; i++) {
Block block = new Block(i, i, 0);
- pendingReplications.increment(block, genDatanodes(i));
+ pendingReplications.increment(block, DFSTestUtil.createDatanodeStorageInfos(i));
}
assertTrue(pendingReplications.size() == 15);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Wed Sep 18 15:12:52 2013
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -67,6 +70,10 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestReplicationPolicy {
+ {
+ ((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private Random random = DFSUtil.getRandom();
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 6;
@@ -75,7 +82,7 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
- private static String[] storageIDs;
+ private static DatanodeStorageInfo[] storages;
// The interval for marking a datanode as stale,
private static long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@@ -86,14 +93,15 @@ public class TestReplicationPolicy {
@BeforeClass
public static void setupCluster() throws Exception {
Configuration conf = new HdfsConfiguration();
- dataNodes = new DatanodeDescriptor[] {
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
- };
+ final String[] racks = {
+ "/d1/r1",
+ "/d1/r1",
+ "/d1/r2",
+ "/d1/r2",
+ "/d2/r3",
+ "/d2/r3"};
+ storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+ dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
@@ -124,6 +132,13 @@ public class TestReplicationPolicy {
}
}
+ private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+ return isOnSameRack(left, right.getDatanodeDescriptor());
+ }
+
+ private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
+ return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
+ }
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
@@ -139,69 +154,69 @@ public class TestReplicationPolicy {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[0], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertEquals(storages[0], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[0]);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertEquals(storages[0], targets[0]);
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>());
+ new ArrayList<DatanodeStorageInfo>());
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- List<DatanodeDescriptor> chosenNodes) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- List<DatanodeDescriptor> chosenNodes, Set<Node> excludedNodes) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
}
- private static DatanodeDescriptor[] chooseTarget(
+ private static DatanodeStorageInfo[] chooseTarget(
int numOfReplicas,
DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
+ List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
- false, excludedNodes, BLOCK_SIZE);
+ false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
}
/**
@@ -215,8 +230,8 @@ public class TestReplicationPolicy {
@Test
public void testChooseTarget2() throws Exception {
Set<Node> excludedNodes;
- DatanodeDescriptor[] targets;
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ DatanodeStorageInfo[] targets;
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
@@ -228,49 +243,52 @@ public class TestReplicationPolicy {
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[0], targets[0]);
+
+ assertFalse(isOnSameRack(targets[0], targets[1]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertEquals(storages[0], targets[0]);
+
+ assertFalse(isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
+
for(int i=1; i<4; i++) {
- assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+ assertFalse(isOnSameRack(targets[0], targets[i]));
}
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
- chosenNodes.add(dataNodes[2]);
+ chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
- excludedNodes, BLOCK_SIZE);
+ excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
- for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+ for (; i < targets.length && !storages[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@@ -289,34 +307,34 @@ public class TestReplicationPolicy {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[1]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[1], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[1]);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[1], targets[0]);
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
for(int i=1; i<4; i++) {
- assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+ assertFalse(isOnSameRack(targets[0], targets[i]));
}
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[1], targets[3]));
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@@ -340,27 +358,27 @@ public class TestReplicationPolicy {
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
}
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
- assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[i], dataNodes[0]));
}
- assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
- cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertTrue(isOnSameRack(targets[0], targets[1]) ||
+ isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
@@ -381,7 +399,7 @@ public class TestReplicationPolicy {
DatanodeDescriptor writerDesc =
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, writerDesc);
assertEquals(targets.length, 0);
@@ -390,12 +408,12 @@ public class TestReplicationPolicy {
targets = chooseTarget(2, writerDesc);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, writerDesc);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
}
/**
@@ -436,7 +454,7 @@ public class TestReplicationPolicy {
// try to choose NUM_OF_DATANODES which is more than actually available
// nodes.
- DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
+ DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
assertEquals(targets.length, NUM_OF_DATANODES - 2);
final List<LoggingEvent> log = appender.getLog();
@@ -456,18 +474,30 @@ public class TestReplicationPolicy {
}
}
- private boolean containsWithinRange(DatanodeDescriptor target,
+ private boolean containsWithinRange(DatanodeStorageInfo target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
- if (nodes[i].equals(target)) {
+ if (nodes[i].equals(target.getDatanodeDescriptor())) {
return true;
}
}
return false;
}
+ private boolean containsWithinRange(DatanodeDescriptor target,
+ DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
+ assert startIndex >= 0 && startIndex < nodes.length;
+ assert endIndex >= startIndex && endIndex < nodes.length;
+ for (int i = startIndex; i <= endIndex; i++) {
+ if (nodes[i].getDatanodeDescriptor().equals(target)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Set dataNodes[0] as stale
@@ -476,19 +506,19 @@ public class TestReplicationPolicy {
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
assertTrue(namenode.getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
// reset
dataNodes[0].setLastUpdate(Time.now());
@@ -513,7 +543,7 @@ public class TestReplicationPolicy {
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
- DatanodeDescriptor[] targets = chooseTarget(0);
+ DatanodeStorageInfo[] targets = chooseTarget(0);
assertEquals(targets.length, 0);
// Since we have 6 datanodes total, stale nodes should
@@ -585,11 +615,12 @@ public class TestReplicationPolicy {
.getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
- DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
- staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+ DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
+ staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
+ BLOCK_SIZE, StorageType.DEFAULT);
assertEquals(targets.length, 3);
- assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ assertFalse(isOnSameRack(targets[0], staleNodeInfo));
// Step 2. Set more than half of the datanodes as stale
for (int i = 0; i < 4; i++) {
@@ -610,10 +641,11 @@ public class TestReplicationPolicy {
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
- targets = replicator.chooseTarget(filename, 3,
- staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
+ new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
+ StorageType.DEFAULT);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ assertTrue(isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
@@ -635,7 +667,7 @@ public class TestReplicationPolicy {
// Call chooseTarget
targets = chooseTarget(3, staleNodeInfo);
assertEquals(targets.length, 3);
- assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ assertFalse(isOnSameRack(targets[0], staleNodeInfo));
} finally {
miniCluster.shutdown();
}
@@ -650,26 +682,26 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate1() throws Exception {
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- DatanodeDescriptor[] targets;
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@@ -681,22 +713,22 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate2() throws Exception {
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- chosenNodes.add(dataNodes[1]);
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ chosenNodes.add(storages[1]);
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[1], dataNodes[0]));
}
/**
@@ -708,31 +740,31 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate3() throws Exception {
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- chosenNodes.add(dataNodes[2]);
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ chosenNodes.add(storages[2]);
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[2]));
targets = chooseTarget(1, dataNodes[2], chosenNodes);
assertEquals(targets.length, 1);
- assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[2]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, dataNodes[2], chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[2]));
}
/**
@@ -1174,4 +1206,4 @@ public class TestReplicationPolicy {
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
-}
\ No newline at end of file
+}