You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cn...@apache.org on 2013/12/13 18:28:18 UTC
svn commit: r1550774 [8/10] - in
/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./
src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apach...
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Fri Dec 13 17:28:14 2013
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.PathUtils;
@@ -67,6 +71,10 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestReplicationPolicy {
+ {
+ ((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private Random random = DFSUtil.getRandom();
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 6;
@@ -75,6 +83,7 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
+ private static DatanodeStorageInfo[] storages;
// The interval for marking a datanode as stale,
private static long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@@ -82,17 +91,28 @@ public class TestReplicationPolicy {
@Rule
public ExpectedException exception = ExpectedException.none();
+ private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
+ dn.getStorageInfos()[0].setUtilizationForTesting(
+ capacity, dfsUsed, remaining, blockPoolUsed);
+ dn.updateHeartbeat(
+ BlockManagerTestUtil.getStorageReportsForDatanode(dn),
+ dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
+ }
+
@BeforeClass
public static void setupCluster() throws Exception {
Configuration conf = new HdfsConfiguration();
- dataNodes = new DatanodeDescriptor[] {
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
- };
+ final String[] racks = {
+ "/d1/r1",
+ "/d1/r1",
+ "/d1/r2",
+ "/d1/r2",
+ "/d2/r3",
+ "/d2/r3"};
+ storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+ dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
@@ -117,12 +137,19 @@ public class TestReplicationPolicy {
dataNodes[i]);
}
for (int i=0; i < NUM_OF_DATANODES; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
+ private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+ return isOnSameRack(left, right.getDatanodeDescriptor());
+ }
+
+ private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
+ return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
+ }
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
@@ -134,74 +161,74 @@ public class TestReplicationPolicy {
*/
@Test
public void testChooseTarget1() throws Exception {
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0); // overloaded
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[0], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertEquals(storages[0], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[0]);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertEquals(storages[0], targets[0]);
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>());
+ new ArrayList<DatanodeStorageInfo>());
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- List<DatanodeDescriptor> chosenNodes) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
- private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- List<DatanodeDescriptor> chosenNodes, Set<Node> excludedNodes) {
+ private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
}
- private static DatanodeDescriptor[] chooseTarget(
+ private static DatanodeStorageInfo[] chooseTarget(
int numOfReplicas,
DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
+ List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
- false, excludedNodes, BLOCK_SIZE);
+ false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
}
/**
@@ -215,8 +242,8 @@ public class TestReplicationPolicy {
@Test
public void testChooseTarget2() throws Exception {
Set<Node> excludedNodes;
- DatanodeDescriptor[] targets;
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ DatanodeStorageInfo[] targets;
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
@@ -228,49 +255,52 @@ public class TestReplicationPolicy {
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[0], targets[0]);
+
+ assertFalse(isOnSameRack(targets[0], targets[1]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertEquals(storages[0], targets[0]);
+
+ assertFalse(isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
+
for(int i=1; i<4; i++) {
- assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+ assertFalse(isOnSameRack(targets[0], targets[i]));
}
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
- chosenNodes.add(dataNodes[2]);
+ chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
- excludedNodes, BLOCK_SIZE);
+ excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
- for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+ for (; i < targets.length && !storages[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@@ -285,41 +315,41 @@ public class TestReplicationPolicy {
@Test
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
0L, 0L, 0, 0); // no space
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[1]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[1], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[1]);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[1], targets[0]);
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
for(int i=1; i<4; i++) {
- assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+ assertFalse(isOnSameRack(targets[0], targets[i]));
}
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[1], targets[3]));
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
@@ -336,35 +366,35 @@ public class TestReplicationPolicy {
public void testChoooseTarget4() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
- assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[i], dataNodes[0]));
}
- assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
- cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertTrue(isOnSameRack(targets[0], targets[1]) ||
+ isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
for(int i=0; i<2; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
@@ -382,7 +412,7 @@ public class TestReplicationPolicy {
DatanodeDescriptor writerDesc =
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, writerDesc);
assertEquals(targets.length, 0);
@@ -391,12 +421,12 @@ public class TestReplicationPolicy {
targets = chooseTarget(2, writerDesc);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, writerDesc);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
}
/**
@@ -426,7 +456,7 @@ public class TestReplicationPolicy {
public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
@@ -437,7 +467,7 @@ public class TestReplicationPolicy {
// try to choose NUM_OF_DATANODES which is more than actually available
// nodes.
- DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
+ DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
assertEquals(targets.length, NUM_OF_DATANODES - 2);
final List<LoggingEvent> log = appender.getLog();
@@ -445,30 +475,42 @@ public class TestReplicationPolicy {
assertFalse(log.size() == 0);
final LoggingEvent lastLogEntry = log.get(log.size() - 1);
- assertEquals(lastLogEntry.getLevel(), Level.WARN);
+ assertTrue(Level.WARN.isGreaterOrEqual(lastLogEntry.getLevel()));
// Suppose to place replicas on each node but two data nodes are not
// available for placing replica, so here we expect a short of 2
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
for(int i=0; i<2; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
- private boolean containsWithinRange(DatanodeDescriptor target,
+ private boolean containsWithinRange(DatanodeStorageInfo target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
- if (nodes[i].equals(target)) {
+ if (nodes[i].equals(target.getDatanodeDescriptor())) {
return true;
}
}
return false;
}
+ private boolean containsWithinRange(DatanodeDescriptor target,
+ DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
+ assert startIndex >= 0 && startIndex < nodes.length;
+ assert endIndex >= startIndex && endIndex < nodes.length;
+ for (int i = startIndex; i <= endIndex; i++) {
+ if (nodes[i].getDatanodeDescriptor().equals(target)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Set dataNodes[0] as stale
@@ -477,19 +519,19 @@ public class TestReplicationPolicy {
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
assertTrue(namenode.getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
// reset
dataNodes[0].setLastUpdate(Time.now());
@@ -514,7 +556,7 @@ public class TestReplicationPolicy {
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
- DatanodeDescriptor[] targets = chooseTarget(0);
+ DatanodeStorageInfo[] targets = chooseTarget(0);
assertEquals(targets.length, 0);
// Since we have 6 datanodes total, stale nodes should
@@ -586,11 +628,12 @@ public class TestReplicationPolicy {
.getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
- DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
- staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+ DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
+ staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
+ BLOCK_SIZE, StorageType.DEFAULT);
assertEquals(targets.length, 3);
- assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ assertFalse(isOnSameRack(targets[0], staleNodeInfo));
// Step 2. Set more than half of the datanodes as stale
for (int i = 0; i < 4; i++) {
@@ -611,10 +654,11 @@ public class TestReplicationPolicy {
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
- targets = replicator.chooseTarget(filename, 3,
- staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
+ new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
+ StorageType.DEFAULT);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ assertTrue(isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
@@ -636,7 +680,7 @@ public class TestReplicationPolicy {
// Call chooseTarget
targets = chooseTarget(3, staleNodeInfo);
assertEquals(targets.length, 3);
- assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ assertFalse(isOnSameRack(targets[0], staleNodeInfo));
} finally {
miniCluster.shutdown();
}
@@ -651,26 +695,26 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate1() throws Exception {
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- DatanodeDescriptor[] targets;
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@@ -682,22 +726,22 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate2() throws Exception {
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- chosenNodes.add(dataNodes[1]);
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ chosenNodes.add(storages[1]);
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[1], dataNodes[0]));
}
/**
@@ -709,31 +753,31 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate3() throws Exception {
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- chosenNodes.add(dataNodes[2]);
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ chosenNodes.add(storages[2]);
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[2]));
targets = chooseTarget(1, dataNodes[2], chosenNodes);
assertEquals(targets.length, 1);
- assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[2]));
+ assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, dataNodes[2], chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
+ assertTrue(isOnSameRack(targets[0], dataNodes[2]));
}
/**
@@ -1077,7 +1121,8 @@ public class TestReplicationPolicy {
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
- ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0]);
+ ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
+ "STORAGE");
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
@@ -1125,11 +1170,12 @@ public class TestReplicationPolicy {
info.setBlockCollection(mbc);
bm.addBlockCollection(info, mbc);
- DatanodeDescriptor[] dnAry = {dataNodes[0]};
+ DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
+ dataNodes[0], new DatanodeStorage("s1"))};
final BlockInfoUnderConstruction ucBlock =
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
- dnAry);
- when(mbc.setLastBlock((BlockInfo) any(), (DatanodeDescriptor[]) any()))
+ storageAry);
+ when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
.thenReturn(ucBlock);
bm.convertLastBlockToUnderConstruction(mbc);
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java Fri Dec 13 17:28:14 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
@@ -57,41 +58,57 @@ public class TestReplicationPolicyWithNo
private BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
- private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
- DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
- DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
- };
-
- private final static DatanodeDescriptor dataNodesInBoundaryCase[] =
- new DatanodeDescriptor[] {
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
- };
-
- private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
- new DatanodeDescriptor[] {
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
- DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
- DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
- DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
- DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
- DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
- DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
+ private static final DatanodeStorageInfo[] storages;
+ private static final DatanodeDescriptor[] dataNodes;
+ static {
+ final String[] racks = {
+ "/d1/r1/n1",
+ "/d1/r1/n1",
+ "/d1/r1/n2",
+ "/d1/r2/n3",
+ "/d1/r2/n3",
+ "/d1/r2/n4",
+ "/d2/r3/n5",
+ "/d2/r3/n6"
+ };
+ storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+ dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
+ }
+
+ private static final DatanodeStorageInfo[] storagesInBoundaryCase;
+ private static final DatanodeDescriptor[] dataNodesInBoundaryCase;
+ static {
+ final String[] racksInBoundaryCase = {
+ "/d1/r1/n1",
+ "/d1/r1/n1",
+ "/d1/r1/n1",
+ "/d1/r1/n2",
+ "/d1/r2/n3",
+ "/d1/r2/n3"
+ };
+ storagesInBoundaryCase = DFSTestUtil.createDatanodeStorageInfos(racksInBoundaryCase);
+ dataNodesInBoundaryCase = DFSTestUtil.toDatanodeDescriptor(storagesInBoundaryCase);
+ }
+
+ private static final DatanodeStorageInfo[] storagesInMoreTargetsCase;
+ private final static DatanodeDescriptor[] dataNodesInMoreTargetsCase;
+ static {
+ final String[] racksInMoreTargetsCase = {
+ "/r1/n1",
+ "/r1/n1",
+ "/r1/n2",
+ "/r1/n2",
+ "/r1/n3",
+ "/r1/n3",
+ "/r2/n4",
+ "/r2/n4",
+ "/r2/n5",
+ "/r2/n5",
+ "/r2/n6",
+ "/r2/n6"
+ };
+ storagesInMoreTargetsCase = DFSTestUtil.createDatanodeStorageInfos(racksInMoreTargetsCase);
+ dataNodesInMoreTargetsCase = DFSTestUtil.toDatanodeDescriptor(storagesInMoreTargetsCase);
};
private final static DatanodeDescriptor NODE =
@@ -129,9 +146,20 @@ public class TestReplicationPolicyWithNo
namenode.stop();
}
+ private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
+ int volFailures) {
+ dn.getStorageInfos()[0].setUtilizationForTesting(
+ capacity, dfsUsed, remaining, blockPoolUsed);
+ dn.updateHeartbeat(
+ BlockManagerTestUtil.getStorageReportsForDatanode(dn),
+ dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
+ }
+
private static void setupDataNodeCapacity() {
for(int i=0; i<NUM_OF_DATANODES; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
@@ -142,11 +170,12 @@ public class TestReplicationPolicyWithNo
* Return false if two targets are found on the same NodeGroup.
*/
private static boolean checkTargetsOnDifferentNodeGroup(
- DatanodeDescriptor[] targets) {
+ DatanodeStorageInfo[] targets) {
if(targets.length == 0)
return true;
Set<String> targetSet = new HashSet<String>();
- for(DatanodeDescriptor node:targets) {
+ for(DatanodeStorageInfo storage:targets) {
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
if(targetSet.contains(nodeGroup)) {
return false;
@@ -156,34 +185,50 @@ public class TestReplicationPolicyWithNo
}
return true;
}
-
- private DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+
+ private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+ return isOnSameRack(left.getDatanodeDescriptor(), right);
+ }
+
+ private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) {
+ return cluster.isOnSameRack(left, right.getDatanodeDescriptor());
+ }
+
+ private boolean isOnSameNodeGroup(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+ return isOnSameNodeGroup(left.getDatanodeDescriptor(), right);
+ }
+
+ private boolean isOnSameNodeGroup(DatanodeDescriptor left, DatanodeStorageInfo right) {
+ return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor());
+ }
+
+ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
- private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>());
+ new ArrayList<DatanodeStorageInfo>());
}
- private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- List<DatanodeDescriptor> chosenNodes) {
+ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
- private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
- private DatanodeDescriptor[] chooseTarget(
+ private DatanodeStorageInfo[] chooseTarget(
int numOfReplicas,
DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
+ List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
- false, excludedNodes, BLOCK_SIZE);
+ false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
}
/**
@@ -197,49 +242,53 @@ public class TestReplicationPolicyWithNo
*/
@Test
public void testChooseTarget1() throws Exception {
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0); // overloaded
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
+
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[0], targets[0]);
+
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[0]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
+ assertEquals(storages[0], targets[0]);
+
+ assertFalse(isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameNodeGroup(targets[1], targets[2]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[0]);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertEquals(storages[0], targets[0]);
+
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
// Make sure no more than one replicas are on the same nodegroup
verifyNoTwoTargetsOnSameNodeGroup(targets);
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
+ private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
Set<String> nodeGroupSet = new HashSet<String>();
- for (DatanodeDescriptor target: targets) {
- nodeGroupSet.add(target.getNetworkLocation());
+ for (DatanodeStorageInfo target: targets) {
+ nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation());
}
assertEquals(nodeGroupSet.size(), targets.length);
}
@@ -254,36 +303,37 @@ public class TestReplicationPolicyWithNo
*/
@Test
public void testChooseTarget2() throws Exception {
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
- excludedNodes, BLOCK_SIZE);
+ excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[0]);
+ assertEquals(storages[0], targets[0]);
+
assertTrue(cluster.isNodeGroupAware());
// Make sure no replicas are on the same nodegroup
for (int i=1;i<4;i++) {
- assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
+ assertFalse(isOnSameNodeGroup(targets[0], targets[i]));
}
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
- assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
+ assertFalse(isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
- chosenNodes.add(dataNodes[2]);
+ chosenNodes.add(storages[2]);
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
- excludedNodes, BLOCK_SIZE);
+ excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
- for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+ for(; i < targets.length && !storages[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@@ -298,39 +348,39 @@ public class TestReplicationPolicyWithNo
@Test
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
0L, 0L, 0, 0); // no space
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertEquals(targets[0], dataNodes[1]);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[1], targets[0]);
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
- assertEquals(targets[0], dataNodes[1]);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertEquals(storages[1], targets[0]);
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
- assertEquals(targets[0], dataNodes[1]);
+ assertEquals(storages[1], targets[0]);
assertTrue(cluster.isNodeGroupAware());
verifyNoTwoTargetsOnSameNodeGroup(targets);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
- cluster.isOnSameRack(targets[2], targets[3]));
+ assertTrue(isOnSameRack(targets[1], targets[2]) ||
+ isOnSameRack(targets[2], targets[3]));
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
@@ -348,33 +398,33 @@ public class TestReplicationPolicyWithNo
public void testChooseTarget4() throws Exception {
// make data node 0-2 to be not qualified to choose: not enough disk space
for(int i=0; i<3; i++) {
- dataNodes[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
- assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[i]));
}
verifyNoTwoTargetsOnSameNodeGroup(targets);
- assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
- cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertTrue(isOnSameRack(targets[0], targets[1]) ||
+ isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@@ -387,7 +437,7 @@ public class TestReplicationPolicyWithNo
@Test
public void testChooseTarget5() throws Exception {
setupDataNodeCapacity();
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, NODE);
assertEquals(targets.length, 0);
@@ -396,12 +446,12 @@ public class TestReplicationPolicyWithNo
targets = chooseTarget(2, NODE);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, NODE);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(targets[1], targets[2]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
verifyNoTwoTargetsOnSameNodeGroup(targets);
}
@@ -415,27 +465,27 @@ public class TestReplicationPolicyWithNo
@Test
public void testRereplicate1() throws Exception {
setupDataNodeCapacity();
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- DatanodeDescriptor[] targets;
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertTrue(isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@@ -448,22 +498,22 @@ public class TestReplicationPolicyWithNo
@Test
public void testRereplicate2() throws Exception {
setupDataNodeCapacity();
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- chosenNodes.add(dataNodes[1]);
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ chosenNodes.add(storages[1]);
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
- cluster.isOnSameRack(dataNodes[0], targets[1]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[0]) &&
+ isOnSameRack(dataNodes[0], targets[1]));
}
/**
@@ -476,33 +526,33 @@ public class TestReplicationPolicyWithNo
@Test
public void testRereplicate3() throws Exception {
setupDataNodeCapacity();
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodes[0]);
- chosenNodes.add(dataNodes[3]);
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storages[0]);
+ chosenNodes.add(storages[3]);
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
+ assertTrue(isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameRack(dataNodes[3], targets[0]));
targets = chooseTarget(1, dataNodes[3], chosenNodes);
assertEquals(targets.length, 1);
- assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
- assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
- assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertTrue(isOnSameRack(dataNodes[3], targets[0]));
+ assertFalse(isOnSameNodeGroup(dataNodes[3], targets[0]));
+ assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+ assertTrue(isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0]));
targets = chooseTarget(2, dataNodes[3], chosenNodes);
assertEquals(targets.length, 2);
- assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+ assertTrue(isOnSameRack(dataNodes[3], targets[0]));
}
/**
@@ -576,16 +626,17 @@ public class TestReplicationPolicyWithNo
cluster.add(dataNodesInBoundaryCase[i]);
}
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
- dataNodes[0].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
-
- dataNodesInBoundaryCase[i].updateHeartbeat(
+ (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE,
+ 0L, 0L, 0L, 0, 0);
+
+ updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(0, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 0);
@@ -594,7 +645,7 @@ public class TestReplicationPolicyWithNo
targets = chooseTarget(2, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 2);
- assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 3);
@@ -611,19 +662,17 @@ public class TestReplicationPolicyWithNo
@Test
public void testRereplicateOnBoundaryTopology() throws Exception {
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
- dataNodesInBoundaryCase[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
- chosenNodes.add(dataNodesInBoundaryCase[0]);
- chosenNodes.add(dataNodesInBoundaryCase[5]);
- DatanodeDescriptor[] targets;
+ List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+ chosenNodes.add(storagesInBoundaryCase[0]);
+ chosenNodes.add(storagesInBoundaryCase[5]);
+ DatanodeStorageInfo[] targets;
targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes);
- assertFalse(cluster.isOnSameNodeGroup(targets[0],
- dataNodesInBoundaryCase[0]));
- assertFalse(cluster.isOnSameNodeGroup(targets[0],
- dataNodesInBoundaryCase[5]));
+ assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[0], targets[0]));
+ assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[5], targets[0]));
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
}
@@ -651,12 +700,12 @@ public class TestReplicationPolicyWithNo
}
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
- dataNodesInMoreTargetsCase[i].updateHeartbeat(
+ updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
- DatanodeDescriptor[] targets;
+ DatanodeStorageInfo[] targets;
// Test normal case -- 3 replicas
targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]);
assertEquals(targets.length, 3);
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java Fri Dec 13 17:28:14 2013
@@ -48,7 +48,8 @@ public class TestUnderReplicatedBlocks {
// but the block does not get put into the under-replicated blocks queue
final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
- DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next();
+ DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock())
+ .iterator().next().getDatanodeDescriptor();
bm.addToInvalidates(b.getLocalBlock(), dn);
// Compute the invalidate work in NN, and trigger the heartbeat from DN
BlockManagerTestUtil.computeAllPendingWork(bm);
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java Fri Dec 13 17:28:14 2013
@@ -17,14 +17,37 @@
*/
package org.apache.hadoop.hdfs.server.common;
-import com.google.common.base.Strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.InetSocketAddress;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.jsp.JspWriter;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.io.DataInputBuffer;
@@ -46,20 +69,7 @@ import org.mockito.stubbing.Answer;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.jsp.JspWriter;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.InetSocketAddress;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import com.google.common.base.Strings;
public class TestJspHelper {
@@ -447,14 +457,28 @@ public class TestJspHelper {
@Test
public void testSortNodeByFields() throws Exception {
- DatanodeID dnId1 = new DatanodeID("127.0.0.1", "localhost1", "storage1",
+ DatanodeID dnId1 = new DatanodeID("127.0.0.1", "localhost1", "datanode1",
1234, 2345, 3456, 4567);
- DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "storage2",
+ DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "datanode2",
1235, 2346, 3457, 4568);
- DatanodeDescriptor dnDesc1 = new DatanodeDescriptor(dnId1, "rack1", 1024,
- 100, 924, 100, 5l, 3l, 10, 2);
- DatanodeDescriptor dnDesc2 = new DatanodeDescriptor(dnId2, "rack2", 2500,
- 200, 1848, 200, 10l, 2l, 20, 1);
+
+ // Setup DatanodeDescriptors with one storage each.
+ DatanodeDescriptor dnDesc1 = new DatanodeDescriptor(dnId1, "rack1");
+ DatanodeDescriptor dnDesc2 = new DatanodeDescriptor(dnId2, "rack2");
+
+ // Update the DatanodeDescriptors with their attached storages.
+ BlockManagerTestUtil.updateStorage(dnDesc1, new DatanodeStorage("dnStorage1"));
+ BlockManagerTestUtil.updateStorage(dnDesc2, new DatanodeStorage("dnStorage2"));
+
+ StorageReport[] report1 = new StorageReport[] {
+ new StorageReport("dnStorage1", false, 1024, 100, 924, 100)
+ };
+ StorageReport[] report2 = new StorageReport[] {
+ new StorageReport("dnStorage2", false, 2500, 200, 1848, 200)
+ };
+ dnDesc1.updateHeartbeat(report1, 5l, 3l, 10, 2);
+ dnDesc2.updateHeartbeat(report2, 10l, 2l, 20, 1);
+
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
live.add(dnDesc1);
live.add(dnDesc2);
@@ -615,3 +639,4 @@ public class TestJspHelper {
MessageFormat.format(EXPECTED__NOTF_PATTERN, version)));
}
}
+
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Dec 13 17:28:14 2013
@@ -22,11 +22,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -48,7 +48,9 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
@@ -74,7 +76,7 @@ public class SimulatedFSDataset implemen
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
- return new SimulatedFSDataset(datanode, storage, conf);
+ return new SimulatedFSDataset(storage, conf);
}
@Override
@@ -136,6 +138,11 @@ public class SimulatedFSDataset implemen
}
@Override
+ public String getStorageUuid() {
+ return storage.getStorageUuid();
+ }
+
+ @Override
synchronized public long getGenerationStamp() {
return theBlock.getGenerationStamp();
}
@@ -318,13 +325,15 @@ public class SimulatedFSDataset implemen
private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>();
- private long capacity; // in bytes
+ private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid();
+
+ private final long capacity; // in bytes
synchronized long getFree() {
return capacity - getUsed();
}
- synchronized long getCapacity() {
+ long getCapacity() {
return capacity;
}
@@ -379,22 +388,33 @@ public class SimulatedFSDataset implemen
}
return bpStorage;
}
+
+ String getStorageUuid() {
+ return storageUuid;
+ }
+
+ synchronized StorageReport getStorageReport(String bpid) {
+ return new StorageReport(getStorageUuid(), false, getCapacity(),
+ getUsed(), getFree(), map.get(bpid).getUsed());
+ }
}
private final Map<String, Map<Block, BInfo>> blockMap
= new HashMap<String, Map<Block,BInfo>>();
private final SimulatedStorage storage;
- private final String storageId;
+ private final String datanodeUuid;
- public SimulatedFSDataset(DataNode datanode, DataStorage storage,
- Configuration conf) {
+ public SimulatedFSDataset(DataStorage storage, Configuration conf) {
if (storage != null) {
- storage.createStorageID(datanode.getXferPort());
- this.storageId = storage.getStorageID();
+ for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
+ storage.createStorageID(storage.getStorageDir(i));
+ }
+ this.datanodeUuid = storage.getDatanodeUuid();
} else {
- this.storageId = "unknownStorageId" + new Random().nextInt();
+ this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
}
- registerMBean(storageId);
+
+ registerMBean(datanodeUuid);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
}
@@ -451,8 +471,7 @@ public class SimulatedFSDataset implemen
}
}
- @Override
- public synchronized BlockListAsLongs getBlockReport(String bpid) {
+ synchronized BlockListAsLongs getBlockReport(String bpid) {
final List<Block> blocks = new ArrayList<Block>();
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
@@ -465,6 +484,12 @@ public class SimulatedFSDataset implemen
return new BlockListAsLongs(blocks, null);
}
+ @Override
+ public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
+ String bpid) {
+ return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid));
+ }
+
@Override // FsDatasetSpi
public List<Long> getCacheReport(String bpid) {
return new LinkedList<Long>();
@@ -661,7 +686,7 @@ public class SimulatedFSDataset implemen
}
@Override // FsDatasetSpi
- public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+ public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
@@ -675,6 +700,7 @@ public class SimulatedFSDataset implemen
map.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo);
+ return binfo.getStorageUuid();
}
@Override // FsDatasetSpi
@@ -931,7 +957,7 @@ public class SimulatedFSDataset implemen
@Override
public String getStorageInfo() {
- return "Simulated FSDataset-" + storageId;
+ return "Simulated FSDataset-" + datanodeUuid;
}
@Override
@@ -958,7 +984,8 @@ public class SimulatedFSDataset implemen
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newlength) {
- return storageId;
+ // Caller does not care about the exact Storage UUID returned.
+ return datanodeUuid;
}
@Override // FsDatasetSpi
@@ -1013,11 +1040,6 @@ public class SimulatedFSDataset implemen
}
@Override
- public String[] getBlockPoolList() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) {
throw new UnsupportedOperationException();
@@ -1029,7 +1051,12 @@ public class SimulatedFSDataset implemen
}
@Override
- public List<Block> getFinalizedBlocks(String bpid) {
+ public StorageReport[] getStorageReports(String bpid) {
+ return new StorageReport[] {storage.getStorageReport(bpid)};
+ }
+
+ @Override
+ public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
throw new UnsupportedOperationException();
}
@@ -1048,3 +1075,4 @@ public class SimulatedFSDataset implemen
throw new UnsupportedOperationException();
}
}
+
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java Fri Dec 13 17:28:14 2013
@@ -25,7 +25,9 @@ import static org.junit.Assert.assertSam
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -102,7 +104,7 @@ public class TestBPOfferService {
.when(mockDn).getMetrics();
// Set up a simulated dataset with our fake BP
- mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, null, conf));
+ mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
mockFSDataset.addBlockPool(FAKE_BPID, conf);
// Wire the dataset to the DN.
@@ -178,7 +180,7 @@ public class TestBPOfferService {
waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs
- bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "");
+ bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", "");
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
assertEquals(1, ret.length);
@@ -294,6 +296,47 @@ public class TestBPOfferService {
}
}
+ /**
+ * Test datanode block pool initialization error handling.
+ * Failure in initializing a block pool should not cause NPE.
+ */
+ @Test
+ public void testBPInitErrorHandling() throws Exception {
+ final DataNode mockDn = Mockito.mock(DataNode.class);
+ Mockito.doReturn(true).when(mockDn).shouldRun();
+ Configuration conf = new Configuration();
+ File dnDataDir = new File(
+ new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
+ Mockito.doReturn(conf).when(mockDn).getConf();
+ Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf();
+ Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
+ when(mockDn).getMetrics();
+ final AtomicInteger count = new AtomicInteger();
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ if (count.getAndIncrement() == 0) {
+ throw new IOException("faked initBlockPool exception");
+ }
+ // The initBlockPool is called again. Now mock init is done.
+ Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
+ return null;
+ }
+ }).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
+ BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
+ bpos.start();
+ try {
+ waitForInitialization(bpos);
+ List<BPServiceActor> actors = bpos.getBPServiceActors();
+ assertEquals(1, actors.size());
+ BPServiceActor actor = actors.get(0);
+ waitForBlockReport(actor.getNameNodeProxy());
+ } finally {
+ bpos.stop();
+ }
+ }
+
private void waitForOneToFail(final BPOfferService bpos)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@@ -311,6 +354,11 @@ public class TestBPOfferService {
*/
private BPOfferService setupBPOSForNNs(
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
+ return setupBPOSForNNs(mockDn, nns);
+ }
+
+ private BPOfferService setupBPOSForNNs(DataNode mockDn,
+ DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
// Set up some fake InetAddresses, then override the connectToNN
// function to return the corresponding proxies.
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Fri Dec 13 17:28:14 2013
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -121,7 +122,7 @@ public class TestBlockRecovery {
* @throws IOException
*/
@Before
- public void startUp() throws IOException {
+ public void startUp() throws IOException, URISyntaxException {
tearDownDone = false;
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
@@ -131,11 +132,12 @@ public class TestBlockRecovery {
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf,
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
- ArrayList<File> dirs = new ArrayList<File>();
+ ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir);
dataDir.mkdirs();
- dirs.add(dataDir);
+ StorageLocation location = StorageLocation.parse(dataDir.getPath());
+ locations.add(location);
final DatanodeProtocolClientSideTranslatorPB namenode =
mock(DatanodeProtocolClientSideTranslatorPB.class);
@@ -163,7 +165,7 @@ public class TestBlockRecovery {
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1)));
- dn = new DataNode(conf, dirs, null) {
+ dn = new DataNode(conf, locations, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Fri Dec 13 17:28:14 2013
@@ -265,7 +265,7 @@ public class TestBlockReplacement {
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
- source.getStorageID(), sourceProxy);
+ source.getDatanodeUuid(), sourceProxy);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());