You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [29/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java Tue Aug 19 23:49:39 2014
@@ -158,21 +158,23 @@ public class UpgradeUtilities {
FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
}
namenodeStorageChecksum = checksumContents(NAME_NODE,
- new File(namenodeStorage, "current"));
+ new File(namenodeStorage, "current"), false);
File dnCurDir = new File(datanodeStorage, "current");
- datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir);
+ datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir, false);
File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current");
- blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir);
+ blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir, false);
File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current/"+DataStorage.STORAGE_DIR_FINALIZED);
- blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE, bpCurFinalizeDir);
+ blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE,
+ bpCurFinalizeDir, true);
File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current/"+DataStorage.STORAGE_DIR_RBW);
- blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir);
+ blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir,
+ false);
}
// Private helper method that writes a file to the given file system.
@@ -266,36 +268,47 @@ public class UpgradeUtilities {
/**
* Compute the checksum of all the files in the specified directory.
- * The contents of subdirectories are not included. This method provides
- * an easy way to ensure equality between the contents of two directories.
+ * This method provides an easy way to ensure equality between the contents
+ * of two directories.
*
* @param nodeType if DATA_NODE then any file named "VERSION" is ignored.
* This is because this file file is changed every time
* the Datanode is started.
- * @param dir must be a directory. Subdirectories are ignored.
+ * @param dir must be a directory
+ * @param recursive whether or not to consider subdirectories
*
* @throws IllegalArgumentException if specified directory is not a directory
* @throws IOException if an IOException occurs while reading the files
* @return the computed checksum value
*/
- public static long checksumContents(NodeType nodeType, File dir) throws IOException {
+ public static long checksumContents(NodeType nodeType, File dir,
+ boolean recursive) throws IOException {
+ CRC32 checksum = new CRC32();
+ checksumContentsHelper(nodeType, dir, checksum, recursive);
+ return checksum.getValue();
+ }
+
+ public static void checksumContentsHelper(NodeType nodeType, File dir,
+ CRC32 checksum, boolean recursive) throws IOException {
if (!dir.isDirectory()) {
throw new IllegalArgumentException(
- "Given argument is not a directory:" + dir);
+ "Given argument is not a directory:" + dir);
}
File[] list = dir.listFiles();
Arrays.sort(list);
- CRC32 checksum = new CRC32();
for (int i = 0; i < list.length; i++) {
if (!list[i].isFile()) {
+ if (recursive) {
+ checksumContentsHelper(nodeType, list[i], checksum, recursive);
+ }
continue;
}
// skip VERSION and dfsUsed file for DataNodes
- if (nodeType == DATA_NODE &&
- (list[i].getName().equals("VERSION") ||
- list[i].getName().equals("dfsUsed"))) {
- continue;
+ if (nodeType == DATA_NODE &&
+ (list[i].getName().equals("VERSION") ||
+ list[i].getName().equals("dfsUsed"))) {
+ continue;
}
FileInputStream fis = null;
@@ -312,7 +325,6 @@ public class UpgradeUtilities {
}
}
}
- return checksum.getValue();
}
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java Tue Aug 19 23:49:39 2014
@@ -49,6 +49,26 @@ public class TestExtendedBlock {
new ExtendedBlock(POOL_A, BLOCK_1_GS1),
new ExtendedBlock(POOL_A, BLOCK_1_GS2));
}
+
+ @Test
+ public void testHashcode() {
+
+ // Different pools, same block id -> different hashcode
+ assertNotEquals(
+ new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+ new ExtendedBlock(POOL_B, BLOCK_1_GS1).hashCode());
+
+ // Same pool, different block id -> different hashcode
+ assertNotEquals(
+ new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+ new ExtendedBlock(POOL_A, BLOCK_2_GS1).hashCode());
+
+ // Same block -> same hashcode
+ assertEquals(
+ new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+ new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode());
+
+ }
private static void assertNotEquals(Object a, Object b) {
assertFalse("expected not equal: '" + a + "' and '" + b + "'",
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Tue Aug 19 23:49:39 2014
@@ -31,25 +31,25 @@ import org.apache.hadoop.fs.permission.A
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -67,9 +67,18 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
@@ -175,8 +184,10 @@ public class TestPBHelper {
private static BlockWithLocations getBlockWithLocations(int bid) {
final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
final String[] storageIDs = {"s1", "s2", "s3"};
+ final StorageType[] storageTypes = {
+ StorageType.DISK, StorageType.DISK, StorageType.DISK};
return new BlockWithLocations(new Block(bid, 0, 1),
- datanodeUuids, storageIDs);
+ datanodeUuids, storageIDs, storageTypes);
}
private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
@@ -550,8 +561,10 @@ public class TestPBHelper {
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
+ StorageType[][] storageTypes = {{StorageType.DEFAULT},
+ {StorageType.DEFAULT, StorageType.DEFAULT}};
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
- blocks, dnInfos, storageIDs);
+ blocks, dnInfos, storageTypes, storageIDs);
BlockCommandProto bcProto = PBHelper.convert(bc);
BlockCommand bc2 = PBHelper.convert(bcProto);
assertEquals(bc.getAction(), bc2.getAction());
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java Tue Aug 19 23:49:39 2014
@@ -22,8 +22,12 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import java.io.IOException;
+import java.net.BindException;
import java.net.URI;
+import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -37,14 +41,13 @@ public class MiniQJMHACluster {
private MiniDFSCluster cluster;
private MiniJournalCluster journalCluster;
private final Configuration conf;
+ private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class);
public static final String NAMESERVICE = "ns1";
private static final String NN1 = "nn1";
private static final String NN2 = "nn2";
- private static final int NN1_IPC_PORT = 10000;
- private static final int NN1_INFO_PORT = 10001;
- private static final int NN2_IPC_PORT = 10002;
- private static final int NN2_INFO_PORT = 10003;
+ private static final Random RANDOM = new Random();
+ private int basePort = 10000;
public static class Builder {
private final Configuration conf;
@@ -69,51 +72,62 @@ public class MiniQJMHACluster {
}
}
- public static MiniDFSNNTopology createDefaultTopology() {
+ public static MiniDFSNNTopology createDefaultTopology(int basePort) {
return new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
- new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
- .setHttpPort(NN1_INFO_PORT)).addNN(
- new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
- .setHttpPort(NN2_INFO_PORT)));
+ new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort)
+ .setHttpPort(basePort + 1)).addNN(
+ new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2)
+ .setHttpPort(basePort + 3)));
}
-
+
private MiniQJMHACluster(Builder builder) throws IOException {
this.conf = builder.conf;
- // start 3 journal nodes
- journalCluster = new MiniJournalCluster.Builder(conf).format(true)
- .build();
- URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
-
- // start cluster with 2 NameNodes
- MiniDFSNNTopology topology = createDefaultTopology();
-
- initHAConf(journalURI, builder.conf);
-
- // First start up the NNs just to format the namespace. The MinIDFSCluster
- // has no way to just format the NameNodes without also starting them.
- cluster = builder.dfsBuilder.nnTopology(topology)
- .manageNameDfsSharedDirs(false).build();
- cluster.waitActive();
- cluster.shutdown();
-
- // initialize the journal nodes
- Configuration confNN0 = cluster.getConfiguration(0);
- NameNode.initializeSharedEdits(confNN0, true);
-
- cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
- cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
-
- // restart the cluster
- cluster.restartNameNodes();
+ int retryCount = 0;
+ while (true) {
+ try {
+ basePort = 10000 + RANDOM.nextInt(1000) * 4;
+ // start 3 journal nodes
+ journalCluster = new MiniJournalCluster.Builder(conf).format(true)
+ .build();
+ URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
+
+ // start cluster with 2 NameNodes
+ MiniDFSNNTopology topology = createDefaultTopology(basePort);
+
+ initHAConf(journalURI, builder.conf);
+
+ // First start up the NNs just to format the namespace. The MinIDFSCluster
+ // has no way to just format the NameNodes without also starting them.
+ cluster = builder.dfsBuilder.nnTopology(topology)
+ .manageNameDfsSharedDirs(false).build();
+ cluster.waitActive();
+ cluster.shutdown();
+
+ // initialize the journal nodes
+ Configuration confNN0 = cluster.getConfiguration(0);
+ NameNode.initializeSharedEdits(confNN0, true);
+
+ cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
+ cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+
+ // restart the cluster
+ cluster.restartNameNodes();
+ ++retryCount;
+ break;
+ } catch (BindException e) {
+ LOG.info("MiniQJMHACluster port conflicts, retried " +
+ retryCount + " times");
+ }
+ }
}
private Configuration initHAConf(URI journalURI, Configuration conf) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
journalURI.toString());
- String address1 = "127.0.0.1:" + NN1_IPC_PORT;
- String address2 = "127.0.0.1:" + NN2_IPC_PORT;
+ String address1 = "127.0.0.1:" + basePort;
+ String address2 = "127.0.0.1:" + (basePort + 2);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Tue Aug 19 23:49:39 2014
@@ -21,16 +21,12 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
-import java.net.URL;
-import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -41,6 +37,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
public class TestNNWithQJM {
final Configuration conf = new HdfsConfiguration();
private MiniJournalCluster mjc = null;
@@ -204,55 +201,4 @@ public class TestNNWithQJM {
"Unable to start log segment 1: too few journals", ioe);
}
}
-
- @Test (timeout = 30000)
- public void testWebPageHasQjmInfo() throws Exception {
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
- MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
- conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
- mjc.getQuorumJournalURI("myjournal").toString());
- // Speed up the test
- conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
-
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(0)
- .manageNameDfsDirs(false)
- .build();
- try {
- URL url = new URL("http://localhost:"
- + NameNode.getHttpAddress(cluster.getConfiguration(0)).getPort()
- + "/dfshealth.jsp");
-
- cluster.getFileSystem().mkdirs(TEST_PATH);
-
- String contents = DFSTestUtil.urlGet(url);
- assertTrue(contents.contains("QJM to ["));
- assertTrue(contents.contains("Written txid 2"));
-
- // Stop one JN, do another txn, and make sure it shows as behind
- // stuck behind the others.
- mjc.getJournalNode(0).stopAndJoin(0);
-
- cluster.getFileSystem().delete(TEST_PATH, true);
-
- contents = DFSTestUtil.urlGet(url);
- System.out.println(contents);
- assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
- .find());
-
- // Restart NN while JN0 is still down.
- cluster.restartNameNode();
-
- contents = DFSTestUtil.urlGet(url);
- System.out.println(contents);
- assertTrue(Pattern.compile("never written").matcher(contents)
- .find());
-
-
- } finally {
- cluster.shutdown();
- }
-
- }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Tue Aug 19 23:49:39 2014
@@ -25,7 +25,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.List;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@@ -208,7 +208,7 @@ public class TestQuorumJournalManagerUni
anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
// And the third log not respond
- SettableFuture<Void> slowLog = SettableFuture.<Void>create();
+ SettableFuture<Void> slowLog = SettableFuture.create();
Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits(
anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
stm.flush();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Tue Aug 19 23:49:39 2014
@@ -170,11 +170,6 @@ public class TestJournalNode {
assertTrue("Bad contents: " + pageContents,
pageContents.contains(
"Hadoop:service=JournalNode,name=JvmMetrics"));
-
- // Check JSP page.
- pageContents = DFSTestUtil.urlGet(
- new URL(urlRoot + "/journalstatus.jsp"));
- assertTrue(pageContents.contains("JournalNode"));
// Create some edits on server side
byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Tue Aug 19 23:49:39 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.security.TestDoAsEffectiveUser;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.junit.AfterClass;
@@ -88,7 +89,8 @@ public class TestDelegationTokenForProxy
builder.append("127.0.1.1,");
builder.append(InetAddress.getLocalHost().getCanonicalHostName());
LOG.info("Local Ip addresses: " + builder.toString());
- conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(superUserShortName),
builder.toString());
}
@@ -100,7 +102,8 @@ public class TestDelegationTokenForProxy
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
config.setLong(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
- config.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER),
+ config.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER),
"group1");
config.setBoolean(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,23 @@
package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -48,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
+import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -81,7 +89,7 @@ public class TestBalancer {
private static final Random r = new Random();
static {
- Balancer.setBlockMoveWaitTime(1000L) ;
+ Dispatcher.setBlockMoveWaitTime(1000L) ;
}
static void initConf(Configuration conf) {
@@ -255,6 +263,18 @@ public class TestBalancer {
}
}
}
+
+ /**
+ * Wait until balanced: each datanode gives utilization within
+ * BALANCE_ALLOWED_VARIANCE of average
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+ ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
+ throws IOException, TimeoutException {
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
+ }
/**
* Wait until balanced: each datanode gives utilization within
@@ -263,11 +283,17 @@ public class TestBalancer {
* @throws TimeoutException
*/
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
- ClientProtocol client, MiniDFSCluster cluster)
- throws IOException, TimeoutException {
+ ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
+ int expectedExcludedNodes) throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.now() + timeout;
+ if (!p.nodesToBeIncluded.isEmpty()) {
+ totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
+ }
+ if (!p.nodesToBeExcluded.isEmpty()) {
+ totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
+ }
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
boolean balanced;
do {
@@ -275,9 +301,20 @@ public class TestBalancer {
client.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
balanced = true;
+ int actualExcludedNodeCount = 0;
for (DatanodeInfo datanode : datanodeReport) {
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
+ if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) {
+ assertTrue(nodeUtilization == 0);
+ actualExcludedNodeCount++;
+ continue;
+ }
+ if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) {
+ assertTrue(nodeUtilization == 0);
+ actualExcludedNodeCount++;
+ continue;
+ }
if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
balanced = false;
if (Time.now() > failtime) {
@@ -294,6 +331,7 @@ public class TestBalancer {
break;
}
}
+ assertEquals(expectedExcludedNodes,actualExcludedNodeCount);
} while (!balanced);
}
@@ -307,22 +345,118 @@ public class TestBalancer {
}
return b.append("]").toString();
}
- /** This test start a cluster with specified number of nodes,
+ /**
+ * Class which contains information about the
+ * new nodes to be added to the cluster for balancing.
+ */
+ static abstract class NewNodeInfo {
+
+ Set<String> nodesToBeExcluded = new HashSet<String>();
+ Set<String> nodesToBeIncluded = new HashSet<String>();
+
+ abstract String[] getNames();
+ abstract int getNumberofNewNodes();
+ abstract int getNumberofIncludeNodes();
+ abstract int getNumberofExcludeNodes();
+
+ public Set<String> getNodesToBeIncluded() {
+ return nodesToBeIncluded;
+ }
+ public Set<String> getNodesToBeExcluded() {
+ return nodesToBeExcluded;
+ }
+ }
+
+ /**
+ * The host names of new nodes are specified
+ */
+ static class HostNameBasedNodes extends NewNodeInfo {
+ String[] hostnames;
+
+ public HostNameBasedNodes(String[] hostnames,
+ Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+ this.hostnames = hostnames;
+ this.nodesToBeExcluded = nodesToBeExcluded;
+ this.nodesToBeIncluded = nodesToBeIncluded;
+ }
+
+ @Override
+ String[] getNames() {
+ return hostnames;
+ }
+ @Override
+ int getNumberofNewNodes() {
+ return hostnames.length;
+ }
+ @Override
+ int getNumberofIncludeNodes() {
+ return nodesToBeIncluded.size();
+ }
+ @Override
+ int getNumberofExcludeNodes() {
+ return nodesToBeExcluded.size();
+ }
+ }
+
+ /**
+ * The number of data nodes to be started are specified.
+ * The data nodes will have same host name, but different port numbers.
+ *
+ */
+ static class PortNumberBasedNodes extends NewNodeInfo {
+ int newNodes;
+ int excludeNodes;
+ int includeNodes;
+
+ public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) {
+ this.newNodes = newNodes;
+ this.excludeNodes = excludeNodes;
+ this.includeNodes = includeNodes;
+ }
+
+ @Override
+ String[] getNames() {
+ return null;
+ }
+ @Override
+ int getNumberofNewNodes() {
+ return newNodes;
+ }
+ @Override
+ int getNumberofIncludeNodes() {
+ return includeNodes;
+ }
+ @Override
+ int getNumberofExcludeNodes() {
+ return excludeNodes;
+ }
+ }
+
+ private void doTest(Configuration conf, long[] capacities, String[] racks,
+ long newCapacity, String newRack, boolean useTool) throws Exception {
+ doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
+ }
+
+ /** This test start a cluster with specified number of nodes,
* and fills it to be 30% full (with a single file replicated identically
* to all datanodes);
* It then adds one new empty node and starts balancing.
- *
+ *
* @param conf - configuration
* @param capacities - array of capacities of original nodes in cluster
* @param racks - array of racks for original nodes in cluster
* @param newCapacity - new node's capacity
* @param newRack - new node's rack
+ * @param nodes - information about new nodes to be started.
* @param useTool - if true run test via Cli with command-line argument
* parsing, etc. Otherwise invoke balancer API directly.
+ * @param useFile - if true, the hosts to included or excluded will be stored in a
+ * file and then later read from the file.
* @throws Exception
*/
- private void doTest(Configuration conf, long[] capacities, String[] racks,
- long newCapacity, String newRack, boolean useTool) throws Exception {
+ private void doTest(Configuration conf, long[] capacities,
+ String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
+ boolean useTool, boolean useFile) throws Exception {
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
@@ -346,17 +480,75 @@ public class TestBalancer {
long totalUsedSpace = totalCapacity*3/10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
- // start up an empty node with the same capacity and on the same rack
- cluster.startDataNodes(conf, 1, true, null,
- new String[]{newRack}, new long[]{newCapacity});
- totalCapacity += newCapacity;
+ if (nodes == null) { // there is no specification of new nodes.
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null,
+ new String[]{newRack}, null,new long[]{newCapacity});
+ totalCapacity += newCapacity;
+ } else {
+ //if running a test with "include list", include original nodes as well
+ if (nodes.getNumberofIncludeNodes()>0) {
+ for (DataNode dn: cluster.getDataNodes())
+ nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName());
+ }
+ String[] newRacks = new String[nodes.getNumberofNewNodes()];
+ long[] newCapacities = new long[nodes.getNumberofNewNodes()];
+ for (int i=0; i < nodes.getNumberofNewNodes(); i++) {
+ newRacks[i] = newRack;
+ newCapacities[i] = newCapacity;
+ }
+ // if host names are specified for the new nodes to be created.
+ if (nodes.getNames() != null) {
+ cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+ newRacks, nodes.getNames(), newCapacities);
+ totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+ } else { // host names are not specified
+ cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+ newRacks, null, newCapacities);
+ totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+ //populate the include nodes
+ if (nodes.getNumberofIncludeNodes() > 0) {
+ int totalNodes = cluster.getDataNodes().size();
+ for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) {
+ nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get(
+ totalNodes-1-i).getDatanodeId().getXferAddr());
+ }
+ }
+ //polulate the exclude nodes
+ if (nodes.getNumberofExcludeNodes() > 0) {
+ int totalNodes = cluster.getDataNodes().size();
+ for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
+ nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get(
+ totalNodes-1-i).getDatanodeId().getXferAddr());
+ }
+ }
+ }
+ }
+ // run balancer and validate results
+ Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+ if (nodes != null) {
+ p = new Balancer.Parameters(
+ Balancer.Parameters.DEFAULT.policy,
+ Balancer.Parameters.DEFAULT.threshold,
+ nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+ }
+
+ int expectedExcludedNodes = 0;
+ if (nodes != null) {
+ if (!nodes.getNodesToBeExcluded().isEmpty()) {
+ expectedExcludedNodes = nodes.getNodesToBeExcluded().size();
+ } else if (!nodes.getNodesToBeIncluded().isEmpty()) {
+ expectedExcludedNodes =
+ cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size();
+ }
+ }
// run balancer and validate results
if (useTool) {
- runBalancerCli(conf, totalUsedSpace, totalCapacity);
+ runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
} else {
- runBalancer(conf, totalUsedSpace, totalCapacity);
+ runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
}
} finally {
cluster.shutdown();
@@ -365,31 +557,86 @@ public class TestBalancer {
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
+ runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
+ }
+
+ private void runBalancer(Configuration conf,
+ long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
+ int excludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
-
+ final int r = Balancer.run(namenodes, p, conf);
+ if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+ return;
+ } else {
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+ }
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
- waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
}
-
+
private void runBalancerCli(Configuration conf,
- long totalUsedSpace, long totalCapacity) throws Exception {
+ long totalUsedSpace, long totalCapacity,
+ Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+ List <String> args = new ArrayList<String>();
+ args.add("-policy");
+ args.add("datanode");
+
+ File excludeHostsFile = null;
+ if (!p.nodesToBeExcluded.isEmpty()) {
+ args.add("-exclude");
+ if (useFile) {
+ excludeHostsFile = new File ("exclude-hosts-file");
+ PrintWriter pw = new PrintWriter(excludeHostsFile);
+ for (String host: p.nodesToBeExcluded) {
+ pw.write( host + "\n");
+ }
+ pw.close();
+ args.add("-f");
+ args.add("exclude-hosts-file");
+ } else {
+ args.add(StringUtils.join(p.nodesToBeExcluded, ','));
+ }
+ }
+
+ File includeHostsFile = null;
+ if (!p.nodesToBeIncluded.isEmpty()) {
+ args.add("-include");
+ if (useFile) {
+ includeHostsFile = new File ("include-hosts-file");
+ PrintWriter pw = new PrintWriter(includeHostsFile);
+ for (String host: p.nodesToBeIncluded){
+ pw.write( host + "\n");
+ }
+ pw.close();
+ args.add("-f");
+ args.add("include-hosts-file");
+ } else {
+ args.add(StringUtils.join(p.nodesToBeIncluded, ','));
+ }
+ }
- final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli();
tool.setConf(conf);
- final int r = tool.run(args); // start rebalancing
+ final int r = tool.run(args.toArray(new String[0])); // start rebalancing
assertEquals("Tools should exit 0 on success", 0, r);
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
- waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
+
+ if (excludeHostsFile != null && excludeHostsFile.exists()) {
+ excludeHostsFile.delete();
+ }
+ if (includeHostsFile != null && includeHostsFile.exists()) {
+ includeHostsFile.delete();
+ }
}
/** one-node cluster test*/
@@ -411,6 +658,71 @@ public class TestBalancer {
oneNodeTest(conf, false);
}
+ /* we first start a cluster and fill the cluster up to a certain size.
+ * then redistribute blocks according the required distribution.
+ * Then we start an empty datanode.
+ * Afterwards a balancer is run to balance the cluster.
+ * A partially filled datanode is excluded during balancing.
+ * This triggers a situation where one of the block's location is unknown.
+ */
+ @Test(timeout=100000)
+ public void testUnknownDatanode() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100};
+ long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY};
+ String racks[] = new String[] {RACK0, RACK1, RACK1};
+
+ int numDatanodes = distribution.length;
+ if (capacities.length != numDatanodes || racks.length != numDatanodes) {
+ throw new IllegalArgumentException("Array length is not the same");
+ }
+
+ // calculate total space that need to be filled
+ final long totalUsedSpace = sum(distribution);
+
+ // fill the cluster
+ ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace,
+ (short) numDatanodes);
+
+ // redistribute blocks
+ Block[][] blocksDN = distributeBlocks(
+ blocks, (short)(numDatanodes-1), distribution);
+
+ // restart the cluster: do NOT format the cluster
+ conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+ .format(false)
+ .racks(racks)
+ .simulatedCapacities(capacities)
+ .build();
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+
+ for(int i = 0; i < 3; i++) {
+ cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
+ }
+
+ cluster.startDataNodes(conf, 1, true, null,
+ new String[]{RACK0}, null,new long[]{CAPACITY});
+ cluster.triggerHeartbeats();
+
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ Set<String> datanodes = new HashSet<String>();
+ datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
+ Balancer.Parameters p = new Balancer.Parameters(
+ Balancer.Parameters.DEFAULT.policy,
+ Balancer.Parameters.DEFAULT.threshold,
+ datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/**
* Test parse method in Balancer#Cli class with threshold value out of
* boundaries.
@@ -435,7 +747,7 @@ public class TestBalancer {
}
}
- /** Test a cluster with even distribution,
+ /** Test a cluster with even distribution,
* then a new empty node is added to the cluster*/
@Test(timeout=100000)
public void testBalancer0() throws Exception {
@@ -463,6 +775,20 @@ public class TestBalancer {
}
@Test(timeout=100000)
+ public void testBalancerWithZeroThreadsForMove() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
+ testBalancer1Internal (conf);
+ }
+
+ @Test(timeout=100000)
+ public void testBalancerWithNonZeroThreadsForMove() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
+ testBalancer1Internal (conf);
+ }
+
+ @Test(timeout=100000)
public void testBalancer2() throws Exception {
testBalancer2Internal(new HdfsConfiguration());
}
@@ -528,14 +854,49 @@ public class TestBalancer {
} catch (IllegalArgumentException e) {
}
- parameters = new String[] { "-threshold 1 -policy" };
+ parameters = new String[] {"-threshold", "1", "-policy"};
+ try {
+ Balancer.Cli.parse(parameters);
+ fail(reason);
+ } catch (IllegalArgumentException e) {
+
+ }
+ parameters = new String[] {"-threshold", "1", "-include"};
try {
Balancer.Cli.parse(parameters);
fail(reason);
} catch (IllegalArgumentException e) {
}
+ parameters = new String[] {"-threshold", "1", "-exclude"};
+ try {
+ Balancer.Cli.parse(parameters);
+ fail(reason);
+ } catch (IllegalArgumentException e) {
+ }
+ parameters = new String[] {"-include", "-f"};
+ try {
+ Balancer.Cli.parse(parameters);
+ fail(reason);
+ } catch (IllegalArgumentException e) {
+
+ }
+ parameters = new String[] {"-exclude", "-f"};
+ try {
+ Balancer.Cli.parse(parameters);
+ fail(reason);
+ } catch (IllegalArgumentException e) {
+
+ }
+
+ parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"};
+ try {
+ Balancer.Cli.parse(parameters);
+ fail("IllegalArgumentException is expected when both -exclude and -include are specified");
+ } catch (IllegalArgumentException e) {
+
+ }
}
/**
@@ -551,6 +912,183 @@ public class TestBalancer {
}
/**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithExcludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> excludeHosts = new HashSet<String>();
+ excludeHosts.add( "datanodeY");
+ excludeHosts.add( "datanodeZ");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithExcludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> excludeHosts = new HashSet<String>();
+ excludeHosts.add( "datanodeY");
+ excludeHosts.add( "datanodeZ");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
+ Parameters.DEFAULT.nodesToBeIncluded), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list in a file
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeListInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> excludeHosts = new HashSet<String>();
+ excludeHosts.add( "datanodeY");
+ excludeHosts.add( "datanodeZ");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
+ }
+
+ /**
+ * Test a cluster with even distribution,G
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithIncludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> includeHosts = new HashSet<String>();
+ includeHosts.add( "datanodeY");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithIncludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> includeHosts = new HashSet<String>();
+ includeHosts.add( "datanodeY");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeListInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> includeHosts = new HashSet<String>();
+ includeHosts.add( "datanodeY");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
+ }
+
+ /**
* @param args
*/
public static void main(String[] args) throws Exception {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Tue Aug 19 23:49:39 2014
@@ -44,7 +44,7 @@ public class TestBalancerWithHANameNodes
ClientProtocol client;
static {
- Balancer.setBlockMoveWaitTime(1000L);
+ Dispatcher.setBlockMoveWaitTime(1000L);
}
/**
@@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
- cluster);
+ cluster, Balancer.Parameters.DEFAULT);
} finally {
cluster.shutdown();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Tue Aug 19 23:49:39 2014
@@ -73,7 +73,7 @@ public class TestBalancerWithMultipleNam
private static final Random RANDOM = new Random();
static {
- Balancer.setBlockMoveWaitTime(1000L) ;
+ Dispatcher.setBlockMoveWaitTime(1000L) ;
}
/** Common objects used in various methods. */
@@ -159,8 +159,8 @@ public class TestBalancerWithMultipleNam
// start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
- Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
+ Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
LOG.info("BALANCER 2");
wait(s.clients, totalUsed, totalCapacity);
@@ -195,7 +195,7 @@ public class TestBalancerWithMultipleNam
balanced = true;
for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d];
- balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold;
+ balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
if (!balanced) {
if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: "
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Tue Aug 19 23:49:39 2014
@@ -22,8 +22,9 @@ import static org.junit.Assert.assertEqu
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -39,6 +40,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
import org.apache.hadoop.net.NetworkTopology;
@@ -53,7 +57,7 @@ public class TestBalancerWithNodeGroup {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
- final private static long CAPACITY = 6000L;
+ final private static long CAPACITY = 5000L;
final private static String RACK0 = "/rack0";
final private static String RACK1 = "/rack1";
final private static String NODEGROUP0 = "/nodegroup0";
@@ -71,12 +75,13 @@ public class TestBalancerWithNodeGroup {
static final int DEFAULT_BLOCK_SIZE = 100;
static {
- Balancer.setBlockMoveWaitTime(1000L) ;
+ Dispatcher.setBlockMoveWaitTime(1000L) ;
}
static Configuration createConf() {
Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopologyWithNodeGroup.class.getName());
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
@@ -170,8 +175,8 @@ public class TestBalancerWithNodeGroup {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor.");
@@ -184,13 +189,26 @@ public class TestBalancerWithNodeGroup {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
- Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
- (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+ Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
+ (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor.");
}
+ private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) {
+ Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>();
+ for (LocatedBlock blk : blks) {
+ for (DatanodeInfo di : blk.getLocations()) {
+ if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) {
+ ret.add(blk.getBlock());
+ break;
+ }
+ }
+ }
+ return ret;
+ }
+
/**
* Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test rack locality for balancer policy.
@@ -220,9 +238,14 @@ public class TestBalancerWithNodeGroup {
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity * 3 / 10;
- TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+ long length = totalUsedSpace / numOfDatanodes;
+ TestBalancer.createFile(cluster, filePath, length,
(short) numOfDatanodes, 0);
+ LocatedBlocks lbs = client.getBlockLocations(filePath.toUri().getPath(), 0,
+ length);
+ Set<ExtendedBlock> before = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
+
long newCapacity = CAPACITY;
String newRack = RACK1;
String newNodeGroup = NODEGROUP2;
@@ -235,22 +258,9 @@ public class TestBalancerWithNodeGroup {
// run balancer and validate results
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
- DatanodeInfo[] datanodeReport =
- client.getDatanodeReport(DatanodeReportType.ALL);
-
- Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
- for (DatanodeInfo datanode: datanodeReport) {
- String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
- int usedCapacity = (int) datanode.getDfsUsed();
-
- if (rackToUsedCapacity.get(rack) != null) {
- rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
- } else {
- rackToUsedCapacity.put(rack, usedCapacity);
- }
- }
- assertEquals(rackToUsedCapacity.size(), 2);
- assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
+ lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length);
+ Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
+ assertEquals(before, after);
} finally {
cluster.shutdown();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Tue Aug 19 23:49:39 2014
@@ -101,7 +101,6 @@ public class BlockManagerTestUtil {
}
/**
- * @param blockManager
* @return replication monitor thread instance from block manager.
*/
public static Daemon getReplicationThread(final BlockManager blockManager)
@@ -111,7 +110,6 @@ public class BlockManagerTestUtil {
/**
* Stop the replication monitor thread
- * @param blockManager
*/
public static void stopReplicationThread(final BlockManager blockManager)
throws IOException {
@@ -126,7 +124,6 @@ public class BlockManagerTestUtil {
}
/**
- * @param blockManager
* @return corruptReplicas from block manager
*/
public static CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){
@@ -135,7 +132,6 @@ public class BlockManagerTestUtil {
}
/**
- * @param blockManager
* @return computed block replication and block invalidation work that can be
* scheduled on data-nodes.
* @throws IOException
@@ -158,7 +154,7 @@ public class BlockManagerTestUtil {
* regardless of invalidation/replication limit configurations.
*
* NB: you may want to set
- * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
+ * {@link DFSConfigKeys#DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
* a high value to ensure that all work is calculated.
*/
public static int computeAllPendingWork(BlockManager bm) {
@@ -200,7 +196,7 @@ public class BlockManagerTestUtil {
/**
* Change whether the block placement policy will prefer the writer's
* local Datanode or not.
- * @param prefer
+ * @param prefer if true, prefer local node
*/
public static void setWritingPrefersLocalNode(
BlockManager bm, boolean prefer) {
@@ -240,8 +236,13 @@ public class BlockManagerTestUtil {
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, DatanodeStorage storage) {
+ return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host");
+ }
+
+ public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+ String rackLocation, DatanodeStorage storage, String hostname) {
DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
- DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
+ DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
if (storage != null) {
dn.updateStorage(storage);
}
@@ -267,4 +268,14 @@ public class BlockManagerTestUtil {
}
return reports.toArray(StorageReport.EMPTY_ARRAY);
}
+
+ /**
+ * Have DatanodeManager check decommission state.
+ * @param dm the DatanodeManager to manipulate
+ */
+ public static void checkDecommissionState(DatanodeManager dm,
+ DatanodeDescriptor node) {
+ dm.checkDecommissionState(node);
+ }
+
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -29,6 +30,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -42,6 +45,41 @@ public class TestBlockInfo {
private static final Log LOG = LogFactory
.getLog("org.apache.hadoop.hdfs.TestBlockInfo");
+
+ @Test
+ public void testAddStorage() throws Exception {
+ BlockInfo blockInfo = new BlockInfo(3);
+
+ final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
+
+ boolean added = blockInfo.addStorage(storage);
+
+ Assert.assertTrue(added);
+ Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
+ }
+
+
+ @Test
+ public void testReplaceStorage() throws Exception {
+
+ // Create two dummy storages.
+ final DatanodeStorageInfo storage1 = DFSTestUtil.createDatanodeStorageInfo("storageID1", "127.0.0.1");
+ final DatanodeStorageInfo storage2 = new DatanodeStorageInfo(storage1.getDatanodeDescriptor(), new DatanodeStorage("storageID2"));
+ final int NUM_BLOCKS = 10;
+ BlockInfo[] blockInfos = new BlockInfo[NUM_BLOCKS];
+
+ // Create a few dummy blocks and add them to the first storage.
+ for (int i = 0; i < NUM_BLOCKS; ++i) {
+ blockInfos[i] = new BlockInfo(3);
+ storage1.addBlock(blockInfos[i]);
+ }
+
+ // Try to move one of the blocks to a different storage.
+ boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]);
+ Assert.assertThat(added, is(false));
+ Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
+ }
+
@Test
public void testBlockListMoveToHead() throws Exception {
LOG.info("BlockInfo moveToHead tests...");
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Aug 19 23:49:39 2014
@@ -368,7 +368,7 @@ public class TestBlockManager {
DatanodeStorageInfo[] pipeline) throws IOException {
for (int i = 1; i < pipeline.length; i++) {
DatanodeStorageInfo storage = pipeline[i];
- bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+ bm.addBlock(storage, blockInfo, null);
blockInfo.addStorage(storage);
}
}
@@ -549,12 +549,12 @@ public class TestBlockManager {
// send block report, should be processed
reset(node);
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
@@ -566,7 +566,7 @@ public class TestBlockManager {
assertEquals(0, ds.getBlockReportCount()); // ready for report again
// send block report, should be processed after restart
reset(node);
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
}
@@ -595,7 +595,7 @@ public class TestBlockManager {
// send block report while pretending to already have blocks
reset(node);
doReturn(1).when(node).numBlocks();
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Tue Aug 19 23:49:39 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
@@ -209,6 +211,8 @@ public class TestBlockTokenWithDFS {
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
conf.setInt("ipc.client.connect.max.retries", 0);
+ // Set short retry timeouts so this test runs faster
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
return conf;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.junit.Test;
@@ -89,14 +90,14 @@ public class TestCorruptReplicaInfo {
DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
- crm.addToCorruptReplicasMap(getBlock(0), dn1, "TEST");
+ addToCorruptReplicasMap(crm, getBlock(0), dn1);
assertEquals("Number of corrupt blocks not returning correctly",
1, crm.size());
- crm.addToCorruptReplicasMap(getBlock(1), dn1, "TEST");
+ addToCorruptReplicasMap(crm, getBlock(1), dn1);
assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size());
- crm.addToCorruptReplicasMap(getBlock(1), dn2, "TEST");
+ addToCorruptReplicasMap(crm, getBlock(1), dn2);
assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size());
@@ -109,7 +110,7 @@ public class TestCorruptReplicaInfo {
0, crm.size());
for (Long block_id: block_ids) {
- crm.addToCorruptReplicasMap(getBlock(block_id), dn1, "TEST");
+ addToCorruptReplicasMap(crm, getBlock(block_id), dn1);
}
assertEquals("Number of corrupt blocks not returning correctly",
@@ -127,4 +128,9 @@ public class TestCorruptReplicaInfo {
crm.getCorruptReplicaBlockIds(10, 7L)));
}
+
+ private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
+ Block blk, DatanodeDescriptor dn) {
+ crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Tue Aug 19 23:49:39 2014
@@ -63,16 +63,16 @@ public class TestDatanodeDescriptor {
assertTrue(storages.length > 0);
final String storageID = storages[0].getStorageID();
// add first block
- assertTrue(dd.addBlock(storageID, blk));
+ assertTrue(storages[0].addBlock(blk));
assertEquals(1, dd.numBlocks());
// remove a non-existent block
assertFalse(dd.removeBlock(blk1));
assertEquals(1, dd.numBlocks());
// add an existent block
- assertFalse(dd.addBlock(storageID, blk));
+ assertFalse(storages[0].addBlock(blk));
assertEquals(1, dd.numBlocks());
// add second block
- assertTrue(dd.addBlock(storageID, blk1));
+ assertTrue(storages[0].addBlock(blk1));
assertEquals(2, dd.numBlocks());
// remove first block
assertTrue(dd.removeBlock(blk));
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Test;
import com.google.common.base.Joiner;
@@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages
@Test
public void testQueues() {
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
- msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
- msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
+ DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
+ DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
+ msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
+ msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
assertEquals(2, msgs.count());
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Aug 19 23:49:39 2014
@@ -82,7 +82,7 @@ public class TestReplicationPolicy {
private static NameNode namenode;
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
- private static DatanodeDescriptor dataNodes[];
+ private static DatanodeDescriptor[] dataNodes;
private static DatanodeStorageInfo[] storages;
// The interval for marking a datanode as stale,
private static final long staleInterval =
@@ -905,49 +905,46 @@ public class TestReplicationPolicy {
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
- List<DatanodeDescriptor> replicaNodeList = new
- ArrayList<DatanodeDescriptor>();
- final Map<String, List<DatanodeDescriptor>> rackMap
- = new HashMap<String, List<DatanodeDescriptor>>();
+ List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
+ final Map<String, List<DatanodeStorageInfo>> rackMap
+ = new HashMap<String, List<DatanodeStorageInfo>>();
dataNodes[0].setRemaining(4*1024*1024);
- replicaNodeList.add(dataNodes[0]);
+ replicaList.add(storages[0]);
dataNodes[1].setRemaining(3*1024*1024);
- replicaNodeList.add(dataNodes[1]);
+ replicaList.add(storages[1]);
dataNodes[2].setRemaining(2*1024*1024);
- replicaNodeList.add(dataNodes[2]);
+ replicaList.add(storages[2]);
dataNodes[5].setRemaining(1*1024*1024);
- replicaNodeList.add(dataNodes[5]);
+ replicaList.add(storages[5]);
// Refresh the last update time for all the datanodes
for (int i = 0; i < dataNodes.length; i++) {
dataNodes[i].setLastUpdate(Time.now());
}
- List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
- List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
- replicator.splitNodesWithRack(
- replicaNodeList, rackMap, first, second);
- // dataNodes[0] and dataNodes[1] are in first set as their rack has two
- // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+ List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
+ List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
+ replicator.splitNodesWithRack(replicaList, rackMap, first, second);
+ // storages[0] and storages[1] are in first set as their rack has two
+ // replica nodes, while storages[2] and dataNodes[5] are in second set.
assertEquals(2, first.size());
assertEquals(2, second.size());
- DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+ DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
null, null, (short)3, first, second);
- // Within first set, dataNodes[1] with less free space
- assertEquals(chosenNode, dataNodes[1]);
+ // Within first set, storages[1] with less free space
+ assertEquals(chosen, storages[1]);
- replicator.adjustSetsWithChosenReplica(
- rackMap, first, second, chosenNode);
+ replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
assertEquals(0, first.size());
assertEquals(3, second.size());
- // Within second set, dataNodes[5] with less free space
- chosenNode = replicator.chooseReplicaToDelete(
+ // Within second set, storages[5] with less free space
+ chosen = replicator.chooseReplicaToDelete(
null, null, (short)2, first, second);
- assertEquals(chosenNode, dataNodes[5]);
+ assertEquals(chosen, storages[5]);
}
/**
@@ -1121,8 +1118,7 @@ public class TestReplicationPolicy {
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
- ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
- "STORAGE");
+ ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.