You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC
svn commit: r1097905 [10/14] - in /hadoop/hdfs/trunk: ./ bin/
src/c++/libhdfs/ src/contrib/hdfsproxy/
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/j...
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java Fri Apr 29 18:16:32 2011
@@ -46,7 +46,8 @@ public class TestDFSStartupVersions exte
/**
* Writes an INFO log message containing the parameters.
*/
- void log(String label, NodeType nodeType, Integer testCase, StorageInfo version) {
+ void log(String label, NodeType nodeType, Integer testCase,
+ StorageData sd) {
String testCaseLine = "";
if (testCase != null) {
testCaseLine = " testCase="+testCase;
@@ -55,9 +56,26 @@ public class TestDFSStartupVersions exte
LOG.info("***TEST*** " + label + ":"
+ testCaseLine
+ " nodeType="+nodeType
- + " layoutVersion="+version.getLayoutVersion()
- + " namespaceID="+version.getNamespaceID()
- + " fsscTime="+version.getCTime());
+ + " layoutVersion="+sd.storageInfo.getLayoutVersion()
+ + " namespaceID="+sd.storageInfo.getNamespaceID()
+ + " fsscTime="+sd.storageInfo.getCTime()
+ + " clusterID="+sd.storageInfo.getClusterID()
+ + " BlockPoolID="+sd.blockPoolId);
+ }
+
+ /**
+ * Class used for initializing version information for tests
+ */
+ private static class StorageData {
+ private final StorageInfo storageInfo;
+ private final String blockPoolId;
+
+ StorageData(int layoutVersion, int namespaceId, String clusterId,
+ long cTime, String bpid) {
+ storageInfo = new StorageInfo(layoutVersion, namespaceId, clusterId,
+ cTime);
+ blockPoolId = bpid;
+ }
}
/**
@@ -67,7 +85,7 @@ public class TestDFSStartupVersions exte
* {currentNamespaceId,incorrectNamespaceId} X
* {pastFsscTime,currentFsscTime,futureFsscTime}
*/
- private StorageInfo[] initializeVersions() throws Exception {
+ private StorageData[] initializeVersions() throws Exception {
int layoutVersionOld = Storage.LAST_UPGRADABLE_LAYOUT_VERSION;
int layoutVersionCur = UpgradeUtilities.getCurrentLayoutVersion();
int layoutVersionNew = Integer.MIN_VALUE;
@@ -76,26 +94,54 @@ public class TestDFSStartupVersions exte
long fsscTimeOld = Long.MIN_VALUE;
long fsscTimeCur = UpgradeUtilities.getCurrentFsscTime(null);
long fsscTimeNew = Long.MAX_VALUE;
+ String clusterID = "testClusterID";
+ String invalidClusterID = "testClusterID";
+ String bpid = UpgradeUtilities.getCurrentBlockPoolID(null);
+ String invalidBpid = "invalidBpid";
- return new StorageInfo[] {
- new StorageInfo(layoutVersionOld, namespaceIdCur, fsscTimeOld), // 0
- new StorageInfo(layoutVersionOld, namespaceIdCur, fsscTimeCur), // 1
- new StorageInfo(layoutVersionOld, namespaceIdCur, fsscTimeNew), // 2
- new StorageInfo(layoutVersionOld, namespaceIdOld, fsscTimeOld), // 3
- new StorageInfo(layoutVersionOld, namespaceIdOld, fsscTimeCur), // 4
- new StorageInfo(layoutVersionOld, namespaceIdOld, fsscTimeNew), // 5
- new StorageInfo(layoutVersionCur, namespaceIdCur, fsscTimeOld), // 6
- new StorageInfo(layoutVersionCur, namespaceIdCur, fsscTimeCur), // 7
- new StorageInfo(layoutVersionCur, namespaceIdCur, fsscTimeNew), // 8
- new StorageInfo(layoutVersionCur, namespaceIdOld, fsscTimeOld), // 9
- new StorageInfo(layoutVersionCur, namespaceIdOld, fsscTimeCur), // 10
- new StorageInfo(layoutVersionCur, namespaceIdOld, fsscTimeNew), // 11
- new StorageInfo(layoutVersionNew, namespaceIdCur, fsscTimeOld), // 12
- new StorageInfo(layoutVersionNew, namespaceIdCur, fsscTimeCur), // 13
- new StorageInfo(layoutVersionNew, namespaceIdCur, fsscTimeNew), // 14
- new StorageInfo(layoutVersionNew, namespaceIdOld, fsscTimeOld), // 15
- new StorageInfo(layoutVersionNew, namespaceIdOld, fsscTimeCur), // 16
- new StorageInfo(layoutVersionNew, namespaceIdOld, fsscTimeNew), // 17
+ return new StorageData[] {
+ new StorageData(layoutVersionOld, namespaceIdCur, clusterID,
+ fsscTimeOld, bpid), // 0
+ new StorageData(layoutVersionOld, namespaceIdCur, clusterID,
+ fsscTimeCur, bpid), // 1
+ new StorageData(layoutVersionOld, namespaceIdCur, clusterID,
+ fsscTimeNew, bpid), // 2
+ new StorageData(layoutVersionOld, namespaceIdOld, clusterID,
+ fsscTimeOld, bpid), // 3
+ new StorageData(layoutVersionOld, namespaceIdOld, clusterID,
+ fsscTimeCur, bpid), // 4
+ new StorageData(layoutVersionOld, namespaceIdOld, clusterID,
+ fsscTimeNew, bpid), // 5
+ new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+ fsscTimeOld, bpid), // 6
+ new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+ fsscTimeCur, bpid), // 7
+ new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+ fsscTimeNew, bpid), // 8
+ new StorageData(layoutVersionCur, namespaceIdOld, clusterID,
+ fsscTimeOld, bpid), // 9
+ new StorageData(layoutVersionCur, namespaceIdOld, clusterID,
+ fsscTimeCur, bpid), // 10
+ new StorageData(layoutVersionCur, namespaceIdOld, clusterID,
+ fsscTimeNew, bpid), // 11
+ new StorageData(layoutVersionNew, namespaceIdCur, clusterID,
+ fsscTimeOld, bpid), // 12
+ new StorageData(layoutVersionNew, namespaceIdCur, clusterID,
+ fsscTimeCur, bpid), // 13
+ new StorageData(layoutVersionNew, namespaceIdCur, clusterID,
+ fsscTimeNew, bpid), // 14
+ new StorageData(layoutVersionNew, namespaceIdOld, clusterID,
+ fsscTimeOld, bpid), // 15
+ new StorageData(layoutVersionNew, namespaceIdOld, clusterID,
+ fsscTimeCur, bpid), // 16
+ new StorageData(layoutVersionNew, namespaceIdOld, clusterID,
+ fsscTimeNew, bpid), // 17
+ // Test with invalid clusterId
+ new StorageData(layoutVersionCur, namespaceIdCur, invalidClusterID,
+ fsscTimeCur, bpid), // 18
+ // Test with invalid block pool Id
+ new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+ fsscTimeCur, invalidBpid) // 19
};
}
@@ -106,29 +152,52 @@ public class TestDFSStartupVersions exte
* will work together. The rules for compatibility,
* taken from the DFS Upgrade Design, are as follows:
* <pre>
- * 1. The data-node does regular startup (no matter which options
+ * <ol>
+ * <li>Check 0: Datanode namespaceID != Namenode namespaceID the startup fails
+ * </li>
+ * <li>Check 1: Datanode clusterID != Namenode clusterID the startup fails
+ * </li>
+ * <li>Check 2: Datanode blockPoolID != Namenode blockPoolID the startup fails
+ * </li>
+ * <li>Check 3: The data-node does regular startup (no matter which options
* it is started with) if
* softwareLV == storedLV AND
* DataNode.FSSCTime == NameNode.FSSCTime
- * 2. The data-node performs an upgrade if it is started without any
+ * </li>
+ * <li>Check 4: The data-node performs an upgrade if it is started without any
* options and
* |softwareLV| > |storedLV| OR
* (softwareLV == storedLV AND
* DataNode.FSSCTime < NameNode.FSSCTime)
- * 3. NOT TESTED: The data-node rolls back if it is started with
+ * </li>
+ * <li>NOT TESTED: The data-node rolls back if it is started with
* the -rollback option and
* |softwareLV| >= |previous.storedLV| AND
* DataNode.previous.FSSCTime <= NameNode.FSSCTime
- * 4. In all other cases the startup fails.
+ * </li>
+ * <li>Check 5: In all other cases the startup fails.</li>
+ * </ol>
* </pre>
*/
- boolean isVersionCompatible(StorageInfo namenodeVer, StorageInfo datanodeVer) {
+ boolean isVersionCompatible(StorageData namenodeSd, StorageData datanodeSd) {
+ final StorageInfo namenodeVer = namenodeSd.storageInfo;
+ final StorageInfo datanodeVer = datanodeSd.storageInfo;
// check #0
if (namenodeVer.getNamespaceID() != datanodeVer.getNamespaceID()) {
LOG.info("namespaceIDs are not equal: isVersionCompatible=false");
return false;
}
// check #1
+ if (!namenodeVer.getClusterID().equals(datanodeVer.getClusterID())) {
+ LOG.info("clusterIDs are not equal: isVersionCompatible=false");
+ return false;
+ }
+ // check #2
+ if (!namenodeSd.blockPoolId.equals(datanodeSd.blockPoolId)) {
+ LOG.info("blockPoolIDs are not equal: isVersionCompatible=false");
+ return false;
+ }
+ // check #3
int softwareLV = FSConstants.LAYOUT_VERSION; // will also be Namenode's LV
int storedLV = datanodeVer.getLayoutVersion();
if (softwareLV == storedLV &&
@@ -137,7 +206,7 @@ public class TestDFSStartupVersions exte
LOG.info("layoutVersions and cTimes are equal: isVersionCompatible=true");
return true;
}
- // check #2
+ // check #4
long absSoftwareLV = Math.abs((long)softwareLV);
long absStoredLV = Math.abs((long)storedLV);
if (absSoftwareLV > absStoredLV ||
@@ -147,7 +216,7 @@ public class TestDFSStartupVersions exte
LOG.info("softwareLayoutVersion is newer OR namenode cTime is newer: isVersionCompatible=true");
return true;
}
- // check #4
+ // check #5
LOG.info("default case: isVersionCompatible=false");
return false;
}
@@ -170,25 +239,30 @@ public class TestDFSStartupVersions exte
UpgradeUtilities.initialize();
Configuration conf = UpgradeUtilities.initializeStorageStateConf(1,
new HdfsConfiguration());
- StorageInfo[] versions = initializeVersions();
- UpgradeUtilities.createStorageDirs(
- NAME_NODE, conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY), "current");
+ StorageData[] versions = initializeVersions();
+ UpgradeUtilities.createNameNodeStorageDirs(
+ conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY), "current");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.REGULAR)
.build();
- StorageInfo nameNodeVersion = new StorageInfo(
- UpgradeUtilities.getCurrentLayoutVersion(),
- UpgradeUtilities.getCurrentNamespaceID(cluster),
- UpgradeUtilities.getCurrentFsscTime(cluster));
+ StorageData nameNodeVersion = new StorageData(
+ UpgradeUtilities.getCurrentLayoutVersion(),
+ UpgradeUtilities.getCurrentNamespaceID(cluster),
+ UpgradeUtilities.getCurrentClusterID(cluster),
+ UpgradeUtilities.getCurrentFsscTime(cluster),
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+
log("NameNode version info", NAME_NODE, null, nameNodeVersion);
+ String bpid = UpgradeUtilities.getCurrentBlockPoolID(cluster);
for (int i = 0; i < versions.length; i++) {
- File[] storage = UpgradeUtilities.createStorageDirs(
- DATA_NODE, conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
+ File[] storage = UpgradeUtilities.createDataNodeStorageDirs(
+ conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
log("DataNode version info", DATA_NODE, i, versions[i]);
- UpgradeUtilities.createVersionFile(conf, DATA_NODE, storage, versions[i]);
+ UpgradeUtilities.createDataNodeVersionFile(storage,
+ versions[i].storageInfo, bpid, versions[i].blockPoolId);
try {
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
} catch (Exception ignore) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java Fri Apr 29 18:16:32 2011
@@ -23,7 +23,7 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.NAME_NODE;
@@ -102,88 +102,197 @@ public class TestDFSStorageStateRecovery
+ " previous="+state[1]
+ " previous.tmp="+state[2]
+ " removed.tmp="+state[3]
- + " lastcheckpoint.tmp="+state[4]);
+ + " lastcheckpoint.tmp="+state[4]
+ + " should recover="+state[5]
+ + " current exists after="+state[6]
+ + " previous exists after="+state[7]);
}
/**
- * Sets up the storage directories for the given node type, either
- * dfs.name.dir or dfs.data.dir. For each element in dfs.name.dir or
- * dfs.data.dir, the subdirectories represented by the first four elements
- * of the <code>state</code> array will be created and populated.
- * See UpgradeUtilities.createStorageDirs().
+ * Sets up the storage directories for namenode as defined by
+ * dfs.name.dir. For each element in dfs.name.dir, the subdirectories
+ * represented by the first four elements of the <code>state</code> array
+ * will be created and populated.
+ *
+ * See {@link UpgradeUtilities#createNameNodeStorageDirs()}
*
- * @param nodeType
- * the type of node that storage should be created for. Based on this
- * parameter either dfs.name.dir or dfs.data.dir is used from the global conf.
* @param state
* a row from the testCases table which indicates which directories
* to setup for the node
- * @return file paths representing either dfs.name.dir or dfs.data.dir
- * directories
+ * @return file paths representing namenode storage directories
*/
- String[] createStorageState(NodeType nodeType, boolean[] state) throws Exception {
- String[] baseDirs = (nodeType == NAME_NODE ?
- conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY) :
- conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+ String[] createNameNodeStorageState(boolean[] state) throws Exception {
+ String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
UpgradeUtilities.createEmptyDirs(baseDirs);
if (state[0]) // current
- UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "current");
if (state[1]) // previous
- UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "previous");
if (state[2]) // previous.tmp
- UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "previous.tmp");
+ UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "previous.tmp");
if (state[3]) // removed.tmp
- UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "removed.tmp");
+ UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "removed.tmp");
if (state[4]) // lastcheckpoint.tmp
- UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "lastcheckpoint.tmp");
+ UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "lastcheckpoint.tmp");
return baseDirs;
}
-
+
+ /**
+ * Sets up the storage directories for a datanode under
+ * dfs.data.dir. For each element in dfs.data.dir, the subdirectories
+ * represented by the first four elements of the <code>state</code> array
+ * will be created and populated.
+ * See {@link UpgradeUtilities#createDataNodeStorageDirs()}
+ *
+ * @param state
+ * a row from the testCases table which indicates which directories
+ * to setup for the node
+ * @return file paths representing datanode storage directories
+ */
+ String[] createDataNodeStorageState(boolean[] state) throws Exception {
+ String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+ UpgradeUtilities.createEmptyDirs(baseDirs);
+ if (state[0]) // current
+ UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "current");
+ if (state[1]) // previous
+ UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "previous");
+ if (state[2]) // previous.tmp
+ UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "previous.tmp");
+ if (state[3]) // removed.tmp
+ UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "removed.tmp");
+ if (state[4]) // lastcheckpoint.tmp
+ UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "lastcheckpoint.tmp");
+ return baseDirs;
+ }
+
+ /**
+ * Sets up the storage directories for a block pool under
+ * dfs.data.dir. For each element in dfs.data.dir, the subdirectories
+ * represented by the first four elements of the <code>state</code> array
+ * will be created and populated.
+ * See {@link UpgradeUtilities#createBlockPoolStorageDirs()}
+ *
+ * @param bpid block pool Id
+ * @param state
+ * a row from the testCases table which indicates which directories
+ * to setup for the node
+ * @return file paths representing block pool storage directories
+ */
+ String[] createBlockPoolStorageState(String bpid, boolean[] state) throws Exception {
+ String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+ UpgradeUtilities.createEmptyDirs(baseDirs);
+ UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "current");
+
+ // After copying the storage directories from master datanode, empty
+ // the block pool storage directories
+ String[] bpDirs = UpgradeUtilities.createEmptyBPDirs(baseDirs, bpid);
+ if (state[0]) // current
+ UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "current", bpid);
+ if (state[1]) // previous
+ UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "previous", bpid);
+ if (state[2]) // previous.tmp
+ UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "previous.tmp",
+ bpid);
+ if (state[3]) // removed.tmp
+ UpgradeUtilities
+ .createBlockPoolStorageDirs(baseDirs, "removed.tmp", bpid);
+ if (state[4]) // lastcheckpoint.tmp
+ UpgradeUtilities.createBlockPoolStorageDirs(baseDirs,
+ "lastcheckpoint.tmp", bpid);
+ return bpDirs;
+ }
+
/**
- * Verify that the current and/or previous exist as indicated by
+ * For NameNode, verify that the current and/or previous exist as indicated by
* the method parameters. If previous exists, verify that
* it hasn't been modified by comparing the checksum of all it's
* containing files with their original checksum. It is assumed that
* the server has recovered.
*/
- void checkResult(NodeType nodeType, String[] baseDirs,
+ void checkResultNameNode(String[] baseDirs,
boolean currentShouldExist, boolean previousShouldExist)
throws IOException
{
- switch (nodeType) {
- case NAME_NODE:
- if (currentShouldExist) {
- for (int i = 0; i < baseDirs.length; i++) {
- assertTrue(new File(baseDirs[i],"current").isDirectory());
- assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
- assertTrue(new File(baseDirs[i],"current/edits").isFile());
- assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
- assertTrue(new File(baseDirs[i],"current/fstime").isFile());
- }
- }
- break;
- case DATA_NODE:
- if (currentShouldExist) {
- for (int i = 0; i < baseDirs.length; i++) {
- assertEquals(
- UpgradeUtilities.checksumContents(
- nodeType, new File(baseDirs[i],"current")),
- UpgradeUtilities.checksumMasterContents(nodeType));
- }
+ if (currentShouldExist) {
+ for (int i = 0; i < baseDirs.length; i++) {
+ assertTrue(new File(baseDirs[i],"current").isDirectory());
+ assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
+ assertTrue(new File(baseDirs[i],"current/edits").isFile());
+ assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
+ assertTrue(new File(baseDirs[i],"current/fstime").isFile());
}
- break;
}
if (previousShouldExist) {
for (int i = 0; i < baseDirs.length; i++) {
assertTrue(new File(baseDirs[i],"previous").isDirectory());
assertEquals(
UpgradeUtilities.checksumContents(
- nodeType, new File(baseDirs[i],"previous")),
- UpgradeUtilities.checksumMasterContents(nodeType));
+ NAME_NODE, new File(baseDirs[i],"previous")),
+ UpgradeUtilities.checksumMasterNameNodeContents());
+ }
+ }
+ }
+
+ /**
+ * For datanode, verify that the current and/or previous exist as indicated by
+ * the method parameters. If previous exists, verify that
+ * it hasn't been modified by comparing the checksum of all it's
+ * containing files with their original checksum. It is assumed that
+ * the server has recovered.
+ */
+ void checkResultDataNode(String[] baseDirs,
+ boolean currentShouldExist, boolean previousShouldExist)
+ throws IOException
+ {
+ if (currentShouldExist) {
+ for (int i = 0; i < baseDirs.length; i++) {
+ assertEquals(
+ UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"current")),
+ UpgradeUtilities.checksumMasterDataNodeContents());
+ }
+ }
+ if (previousShouldExist) {
+ for (int i = 0; i < baseDirs.length; i++) {
+ assertTrue(new File(baseDirs[i],"previous").isDirectory());
+ assertEquals(
+ UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"previous")),
+ UpgradeUtilities.checksumMasterDataNodeContents());
}
}
}
+ /**
+ * For block pool, verify that the current and/or previous exist as indicated
+ * by the method parameters. If previous exists, verify that
+ * it hasn't been modified by comparing the checksum of all it's
+ * containing files with their original checksum. It is assumed that
+ * the server has recovered.
+ * @param baseDirs directories pointing to block pool storage
+ * @param bpid block pool Id
+ * @param currentShouldExist current directory exists under storage
+ * @param currentShouldExist previous directory exists under storage
+ */
+ void checkResultBlockPool(String[] baseDirs, boolean currentShouldExist,
+ boolean previousShouldExist) throws IOException
+ {
+ if (currentShouldExist) {
+ for (int i = 0; i < baseDirs.length; i++) {
+ File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT);
+ assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir),
+ UpgradeUtilities.checksumMasterBlockPoolContents());
+ }
+ }
+ if (previousShouldExist) {
+ for (int i = 0; i < baseDirs.length; i++) {
+ File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS);
+ assertTrue(bpPrevDir.isDirectory());
+ assertEquals(
+ UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir),
+ UpgradeUtilities.checksumMasterBlockPoolContents());
+ }
+ }
+ }
+
private MiniDFSCluster createCluster(Configuration c) throws IOException {
return new MiniDFSCluster.Builder(c)
.numDataNodes(0)
@@ -211,10 +320,10 @@ public class TestDFSStorageStateRecovery
boolean prevAfterRecover = testCase[7];
log("NAME_NODE recovery", numDirs, i, testCase);
- baseDirs = createStorageState(NAME_NODE, testCase);
+ baseDirs = createNameNodeStorageState(testCase);
if (shouldRecover) {
cluster = createCluster(conf);
- checkResult(NAME_NODE, baseDirs, curAfterRecover, prevAfterRecover);
+ checkResultNameNode(baseDirs, curAfterRecover, prevAfterRecover);
cluster.shutdown();
} else {
try {
@@ -237,12 +346,13 @@ public class TestDFSStorageStateRecovery
}
/**
- * This test iterates over the testCases table and attempts
- * to startup the DataNode normally.
+ * This test iterates over the testCases table for Datanode storage and
+ * attempts to startup the DataNode normally.
*/
public void testDNStorageStates() throws Exception {
String[] baseDirs;
+ // First setup the datanode storage directory
for (int numDirs = 1; numDirs <= 2; numDirs++) {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
@@ -254,21 +364,66 @@ public class TestDFSStorageStateRecovery
boolean prevAfterRecover = testCase[7];
log("DATA_NODE recovery", numDirs, i, testCase);
- createStorageState(NAME_NODE,
- new boolean[] {true, true, false, false, false});
+ createNameNodeStorageState(new boolean[] { true, true, false, false,
+ false });
cluster = createCluster(conf);
- baseDirs = createStorageState(DATA_NODE, testCase);
+ baseDirs = createDataNodeStorageState(testCase);
if (!testCase[0] && !testCase[1] && !testCase[2] && !testCase[3]) {
// DataNode will create and format current if no directories exist
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
} else {
if (shouldRecover) {
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
- checkResult(DATA_NODE, baseDirs, curAfterRecover, prevAfterRecover);
+ checkResultDataNode(baseDirs, curAfterRecover, prevAfterRecover);
} else {
try {
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
- throw new AssertionError("DataNode should have failed to start");
+ assertFalse(cluster.getDataNodes().get(0).isDatanodeUp());
+ } catch (Exception expected) {
+ // expected
+ }
+ }
+ }
+ cluster.shutdown();
+ } // end testCases loop
+ } // end numDirs loop
+ }
+
+ /**
+ * This test iterates over the testCases table for block pool storage and
+ * attempts to startup the DataNode normally.
+ */
+ public void testBlockPoolStorageStates() throws Exception {
+ String[] baseDirs;
+
+ // First setup the datanode storage directory
+ String bpid = UpgradeUtilities.getCurrentBlockPoolID(null);
+ for (int numDirs = 1; numDirs <= 2; numDirs++) {
+ conf = new HdfsConfiguration();
+ conf.setInt("dfs.datanode.scan.period.hours", -1);
+ conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
+ for (int i = 0; i < NUM_DN_TEST_CASES; i++) {
+ boolean[] testCase = testCases[i];
+ boolean shouldRecover = testCase[5];
+ boolean curAfterRecover = testCase[6];
+ boolean prevAfterRecover = testCase[7];
+
+ log("BLOCK_POOL recovery", numDirs, i, testCase);
+ createNameNodeStorageState(new boolean[] { true, true, false, false,
+ false });
+ cluster = createCluster(conf);
+ baseDirs = createBlockPoolStorageState(bpid, testCase);
+ if (!testCase[0] && !testCase[1] && !testCase[2] && !testCase[3]) {
+ // DataNode will create and format current if no directories exist
+ cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+ } else {
+ if (shouldRecover) {
+ cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+ checkResultBlockPool(baseDirs, curAfterRecover, prevAfterRecover);
+ } else {
+ try {
+ cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+ assertFalse(cluster.getDataNodes().get(0).isBPServiceAlive(bpid));
} catch (Exception expected) {
// expected
}
@@ -288,13 +443,4 @@ public class TestDFSStorageStateRecovery
LOG.info("Shutting down MiniDFSCluster");
if (cluster != null) cluster.shutdown();
}
-
- public static void main(String[] args) throws Exception {
- TestDFSStorageStateRecovery test = new TestDFSStorageStateRecovery();
- test.testNNStorageStates();
- test.testDNStorageStates();
- }
-
-}
-
-
+}
\ No newline at end of file
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Fri Apr 29 18:16:32 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -61,41 +60,45 @@ public class TestDFSUpgrade {
}
/**
- * Verify that the current and previous directories exist. Verify that
- * previous hasn't been modified by comparing the checksum of all it's
- * containing files with their original checksum. It is assumed that
- * the server has recovered and upgraded.
- */
- void checkResult(NodeType nodeType, String[] baseDirs) throws IOException {
- switch (nodeType) {
- case NAME_NODE:
- for (int i = 0; i < baseDirs.length; i++) {
- assertTrue(new File(baseDirs[i],"current").isDirectory());
- assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
- assertTrue(new File(baseDirs[i],"current/edits").isFile());
- assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
- assertTrue(new File(baseDirs[i],"current/fstime").isFile());
- }
- break;
- case DATA_NODE:
- for (int i = 0; i < baseDirs.length; i++) {
- assertEquals(
- UpgradeUtilities.checksumContents(
- nodeType, new File(baseDirs[i],"current")),
- UpgradeUtilities.checksumMasterContents(nodeType));
- }
- break;
- }
+ * For namenode, Verify that the current and previous directories exist.
+ * Verify that previous hasn't been modified by comparing the checksum of all
+ * its files with their original checksum. It is assumed that the
+ * server has recovered and upgraded.
+ */
+ void checkNameNode(String[] baseDirs) throws IOException {
for (int i = 0; i < baseDirs.length; i++) {
- assertTrue(new File(baseDirs[i],"previous").isDirectory());
- assertEquals(
- UpgradeUtilities.checksumContents(
- nodeType, new File(baseDirs[i],"previous")),
- UpgradeUtilities.checksumMasterContents(nodeType));
+ assertTrue(new File(baseDirs[i],"current").isDirectory());
+ assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
+ assertTrue(new File(baseDirs[i],"current/edits").isFile());
+ assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
+ assertTrue(new File(baseDirs[i],"current/fstime").isFile());
+
+ File previous = new File(baseDirs[i], "previous");
+ assertTrue(previous.isDirectory());
+ assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous),
+ UpgradeUtilities.checksumMasterNameNodeContents());
}
}
/**
+ * For datanode, for a block pool, verify that the current and previous
+ * directories exist. Verify that previous hasn't been modified by comparing
+ * the checksum of all its files with their original checksum. It
+ * is assumed that the server has recovered and upgraded.
+ */
+ void checkDataNode(String[] baseDirs, String bpid) throws IOException {
+ for (int i = 0; i < baseDirs.length; i++) {
+ File current = new File(baseDirs[i], "current/" + bpid + "/current");
+ assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, current),
+ UpgradeUtilities.checksumMasterDataNodeContents());
+
+ File previous = new File(baseDirs[i], "current/" + bpid + "/previous");
+ assertTrue(previous.isDirectory());
+ assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previous),
+ UpgradeUtilities.checksumMasterDataNodeContents());
+ }
+ }
+ /**
* Attempts to start a NameNode with the given operation. Starting
* the NameNode should throw an exception.
*/
@@ -114,17 +117,16 @@ public class TestDFSUpgrade {
}
/**
- * Attempts to start a DataNode with the given operation. Starting
- * the DataNode should throw an exception.
+ * Attempts to start a DataNode with the given operation. Starting
+ * the given block pool should fail.
+ * @param operation startup option
+ * @param bpid block pool Id that should fail to start
+ * @throws IOException
*/
- void startDataNodeShouldFail(StartupOption operation) {
- try {
- cluster.startDataNodes(conf, 1, false, operation, null); // should fail
- throw new AssertionError("DataNode should have failed to start");
- } catch (Exception expected) {
- // expected
- assertFalse(cluster.isDataNodeUp());
- }
+ void startBlockPoolShouldFail(StartupOption operation, String bpid) throws IOException {
+ cluster.startDataNodes(conf, 1, false, operation, null); // should fail
+ assertFalse("Block pool " + bpid + " should have failed to start",
+ cluster.getDataNodes().get(0).isBPServiceAlive(bpid));
}
/**
@@ -149,6 +151,7 @@ public class TestDFSUpgrade {
File[] baseDirs;
UpgradeUtilities.initialize();
+ StorageInfo storageInfo = null;
for (int numDirs = 1; numDirs <= 2; numDirs++) {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
@@ -157,67 +160,76 @@ public class TestDFSUpgrade {
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
log("Normal NameNode upgrade", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- checkResult(NAME_NODE, nameNodeDirs);
+ checkNameNode(nameNodeDirs);
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("Normal DataNode upgrade", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
- checkResult(DATA_NODE, dataNodeDirs);
+ checkDataNode(dataNodeDirs, UpgradeUtilities.getCurrentBlockPoolID(null));
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("NameNode upgrade with existing previous dir", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("DataNode upgrade with existing previous dir", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
- checkResult(DATA_NODE, dataNodeDirs);
+ checkDataNode(dataNodeDirs, UpgradeUtilities.getCurrentBlockPoolID(null));
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("DataNode upgrade with future stored layout version in current", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
- new StorageInfo(Integer.MIN_VALUE,
- UpgradeUtilities.getCurrentNamespaceID(cluster),
- UpgradeUtilities.getCurrentFsscTime(cluster)));
- startDataNodeShouldFail(StartupOption.REGULAR);
+ baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ storageInfo = new StorageInfo(Integer.MIN_VALUE,
+ UpgradeUtilities.getCurrentNamespaceID(cluster),
+ UpgradeUtilities.getCurrentClusterID(cluster),
+ UpgradeUtilities.getCurrentFsscTime(cluster));
+
+ UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+
+ startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
+ .getCurrentBlockPoolID(null));
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("DataNode upgrade with newer fsscTime in current", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
- new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
- UpgradeUtilities.getCurrentNamespaceID(cluster),
- Long.MAX_VALUE));
- startDataNodeShouldFail(StartupOption.REGULAR);
+ baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ storageInfo = new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
+ UpgradeUtilities.getCurrentNamespaceID(cluster),
+ UpgradeUtilities.getCurrentClusterID(cluster), Long.MAX_VALUE);
+
+ UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+ // Ensure corresponding block pool failed to initialized
+ startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
+ .getCurrentBlockPoolID(null));
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("NameNode upgrade with no edits file", numDirs);
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
for (File f : baseDirs) {
FileUtil.fullyDelete(new File(f,"edits"));
}
@@ -225,7 +237,7 @@ public class TestDFSUpgrade {
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode upgrade with no image file", numDirs);
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
for (File f : baseDirs) {
FileUtil.fullyDelete(new File(f,"fsimage"));
}
@@ -233,7 +245,7 @@ public class TestDFSUpgrade {
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode upgrade with corrupt version file", numDirs);
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
for (File f : baseDirs) {
UpgradeUtilities.corruptFile(new File(f,"VERSION"));
}
@@ -241,20 +253,28 @@ public class TestDFSUpgrade {
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode upgrade with old layout version in current", numDirs);
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
- new StorageInfo(Storage.LAST_UPGRADABLE_LAYOUT_VERSION + 1,
- UpgradeUtilities.getCurrentNamespaceID(null),
- UpgradeUtilities.getCurrentFsscTime(null)));
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ storageInfo = new StorageInfo(Storage.LAST_UPGRADABLE_LAYOUT_VERSION + 1,
+ UpgradeUtilities.getCurrentNamespaceID(null),
+ UpgradeUtilities.getCurrentClusterID(null),
+ UpgradeUtilities.getCurrentFsscTime(null));
+
+ UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs, storageInfo,
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode upgrade with future layout version in current", numDirs);
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
- new StorageInfo(Integer.MIN_VALUE,
- UpgradeUtilities.getCurrentNamespaceID(null),
- UpgradeUtilities.getCurrentFsscTime(null)));
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ storageInfo = new StorageInfo(Integer.MIN_VALUE,
+ UpgradeUtilities.getCurrentNamespaceID(null),
+ UpgradeUtilities.getCurrentClusterID(null),
+ UpgradeUtilities.getCurrentFsscTime(null));
+
+ UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs, storageInfo,
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
} // end numDir loop
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java Fri Apr 29 18:16:32 2011
@@ -189,6 +189,7 @@ public class TestDFSUpgradeFromImage ext
.numDataNodes(numDataNodes)
.format(false)
.startupOption(StartupOption.UPGRADE)
+ .clusterId("testClusterId")
.build();
cluster.waitActive();
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java Fri Apr 29 18:16:32 2011
@@ -28,7 +28,7 @@ import java.util.List;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.fs.BlockLocation;
@@ -43,11 +43,11 @@ public class TestDFSUtil {
ds[0] = d;
// ok
- Block b1 = new Block(1, 1, 1);
+ ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 1, 1);
LocatedBlock l1 = new LocatedBlock(b1, ds, 0, false);
// corrupt
- Block b2 = new Block(2, 1, 1);
+ ExtendedBlock b2 = new ExtendedBlock("bpid", 2, 1, 1);
LocatedBlock l2 = new LocatedBlock(b2, ds, 0, true);
List<LocatedBlock> ls = Arrays.asList(l1, l2);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Apr 29 18:16:32 2011
@@ -45,14 +45,15 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
@@ -146,7 +147,7 @@ public class TestDataTransferProtocol ex
in.readFully(arr);
}
- private void writeZeroLengthPacket(Block block, String description)
+ private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
@@ -167,12 +168,12 @@ public class TestDataTransferProtocol ex
sendRecvData(description, false);
}
- private void testWrite(Block block, BlockConstructionStage stage, long newGS,
+ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS,
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
- DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0, stage, newGS,
- block.getNumBytes(), block.getNumBytes(), "cl", null,
+ DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
+ stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
if (eofExcepted) {
ERROR.write(recvOut);
@@ -194,7 +195,9 @@ public class TestDataTransferProtocol ex
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
try {
cluster.waitActive();
- datanode = cluster.getDataNodes().get(0).dnRegistration;
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ datanode = DataNodeTestUtils.getDNRegistrationForBP(
+ cluster.getDataNodes().get(0), poolId);
dnAddr = NetUtils.createSocketAddr(datanode.getName());
FileSystem fileSys = cluster.getFileSystem();
@@ -202,7 +205,7 @@ public class TestDataTransferProtocol ex
Path file = new Path("dataprotocol.dat");
DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
// get the first blockid for the file
- Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
// test PIPELINE_SETUP_CREATE on a finalized block
testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
"Cannot create an existing block", true);
@@ -240,8 +243,8 @@ public class TestDataTransferProtocol ex
/* Test writing to a new block */
long newBlockId = firstBlock.getBlockId() + 1;
- Block newBlock = new Block(newBlockId, 0,
- firstBlock.getGenerationStamp());
+ ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(),
+ newBlockId, 0, firstBlock.getGenerationStamp());
// test PIPELINE_SETUP_CREATE on a new block
testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
@@ -337,7 +340,8 @@ public class TestDataTransferProtocol ex
createFile(fileSys, file, fileLen);
// get the first blockid for the file
- Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ final String poolId = firstBlock.getBlockPoolId();
long newBlockId = firstBlock.getBlockId() + 1;
recvBuf.reset();
@@ -357,7 +361,7 @@ public class TestDataTransferProtocol ex
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
- new Block(newBlockId), 0,
+ new ExtendedBlock(poolId, newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -371,7 +375,7 @@ public class TestDataTransferProtocol ex
sendBuf.reset();
recvBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
- new Block(++newBlockId), 0,
+ new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -395,7 +399,7 @@ public class TestDataTransferProtocol ex
sendBuf.reset();
recvBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
- new Block(++newBlockId), 0,
+ new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -418,7 +422,8 @@ public class TestDataTransferProtocol ex
/* Test OP_READ_BLOCK */
- Block blk = new Block(firstBlock);
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
long blkid = blk.getBlockId();
// bad block id
sendBuf.reset();
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Fri Apr 29 18:16:32 2011
@@ -30,8 +30,8 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -65,11 +65,11 @@ public class TestDatanodeBlockScanner ex
* @throws IOException
* @throws TimeoutException
*/
- private static long waitForVerification(DatanodeInfo dn, FileSystem fs,
+ private static long waitForVerification(int infoPort, FileSystem fs,
Path file, int blocksValidated,
long newTime, long timeout)
throws IOException, TimeoutException {
- URL url = new URL("http://localhost:" + dn.getInfoPort() +
+ URL url = new URL("http://localhost:" + infoPort +
"/blockScannerReport?listblocks");
long lastWarnTime = System.currentTimeMillis();
if (newTime <= 0) newTime = 1L;
@@ -146,7 +146,8 @@ public class TestDatanodeBlockScanner ex
/*
* The cluster restarted. The block should be verified by now.
*/
- assertTrue(waitForVerification(dn, fs, file1, 1, startTime, TIMEOUT) >= startTime);
+ assertTrue(waitForVerification(dn.getInfoPort(), fs, file1, 1, startTime,
+ TIMEOUT) >= startTime);
/*
* Create a new file and read the block. The block should be marked
@@ -155,12 +156,17 @@ public class TestDatanodeBlockScanner ex
DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(),
conf, true);
- assertTrue(waitForVerification(dn, fs, file2, 2, startTime, TIMEOUT) >= startTime);
+ assertTrue(waitForVerification(dn.getInfoPort(), fs, file2, 2, startTime,
+ TIMEOUT) >= startTime);
cluster.shutdown();
}
- public void testBlockCorruptionPolicy() throws Exception {
+ public static boolean corruptReplica(ExtendedBlock blk, int replica) throws IOException {
+ return MiniDFSCluster.corruptReplica(replica, blk);
+ }
+
+ public void testBlockCorruptionPolicy() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
Random random = new Random();
@@ -172,13 +178,13 @@ public class TestDatanodeBlockScanner ex
fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockVerification/file1");
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
- String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
DFSTestUtil.waitReplication(fs, file1, (short)3);
assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Corrupt random replica of block
- assertTrue(cluster.corruptReplica(block, rand));
+ assertTrue(MiniDFSCluster.corruptReplica(rand, block));
// Restart the datanode hoping the corrupt block to be reported
cluster.restartDataNode(rand);
@@ -189,9 +195,9 @@ public class TestDatanodeBlockScanner ex
// Corrupt all replicas. Now, block should be marked as corrupt
// and we should get all the replicas
- assertTrue(cluster.corruptReplica(block, 0));
- assertTrue(cluster.corruptReplica(block, 1));
- assertTrue(cluster.corruptReplica(block, 2));
+ assertTrue(MiniDFSCluster.corruptReplica(0, block));
+ assertTrue(MiniDFSCluster.corruptReplica(1, block));
+ assertTrue(MiniDFSCluster.corruptReplica(2, block));
// Read the file to trigger reportBadBlocks by client
try {
@@ -251,8 +257,7 @@ public class TestDatanodeBlockScanner ex
FileSystem fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
- Block blk = DFSTestUtil.getFirstBlock(fs, file1);
- String block = blk.getBlockName();
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
// Wait until block is replicated to numReplicas
DFSTestUtil.waitReplication(fs, file1, numReplicas);
@@ -260,7 +265,7 @@ public class TestDatanodeBlockScanner ex
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
- if (cluster.corruptReplica(block, i)) {
+ if (corruptReplica(block, i)) {
corruptReplicasDNIDs[j++] = i;
LOG.info("successfully corrupted block " + block + " on node "
+ i + " " + cluster.getDataNodes().get(i).getSelfAddr());
@@ -281,7 +286,7 @@ public class TestDatanodeBlockScanner ex
// Loop until all corrupt replicas are reported
DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
- blk, numCorruptReplicas);
+ block, numCorruptReplicas);
// Loop until the block recovers after replication
DFSTestUtil.waitReplication(fs, file1, numReplicas);
@@ -290,7 +295,7 @@ public class TestDatanodeBlockScanner ex
// Make sure the corrupt replica is invalidated and removed from
// corruptReplicasMap
DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
- blk, 0);
+ block, 0);
cluster.shutdown();
}
@@ -299,7 +304,6 @@ public class TestDatanodeBlockScanner ex
final Configuration conf = new HdfsConfiguration();
final short REPLICATION_FACTOR = (short)2;
final Path fileName = new Path("/file1");
- String block; //block file name
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
@@ -312,11 +316,12 @@ public class TestDatanodeBlockScanner ex
.build();
cluster.waitActive();
+ ExtendedBlock block;
try {
FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
- block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
+ block = DFSTestUtil.getFirstBlock(fs, fileName);
} finally {
cluster.shutdown();
}
@@ -330,8 +335,8 @@ public class TestDatanodeBlockScanner ex
cluster.waitActive();
try {
FileSystem fs = cluster.getFileSystem();
- DatanodeInfo dn = new DatanodeInfo(cluster.getDataNodes().get(0).dnRegistration);
- assertTrue(waitForVerification(dn, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
+ int infoPort = cluster.getDataNodes().get(0).getInfoPort();
+ assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
// Truncate replica of block
if (!changeReplicaLength(block, 0, -1)) {
@@ -373,43 +378,31 @@ public class TestDatanodeBlockScanner ex
/**
* Change the length of a block at datanode dnIndex
*/
- static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
- File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
- for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
- File blockFile = new File(baseDir, "data" + (i+1) +
- MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
- if (blockFile.exists()) {
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- long origLen = raFile.length();
- raFile.setLength(origLen + lenDelta);
- raFile.close();
- LOG.info("assigned length " + (origLen + lenDelta)
- + " to block file " + blockFile.getPath()
- + " on datanode " + dnIndex);
- return true;
- }
+ static boolean changeReplicaLength(ExtendedBlock blk, int dnIndex,
+ int lenDelta) throws IOException {
+ File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
+ if (blockFile != null && blockFile.exists()) {
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ raFile.setLength(raFile.length()+lenDelta);
+ raFile.close();
+ return true;
}
- LOG.info("failed to change length of block " + blockName);
+ LOG.info("failed to change length of block " + blk);
return false;
}
- private static void waitForBlockDeleted(String blockName, int dnIndex,
- long timeout)
- throws IOException, TimeoutException, InterruptedException {
- File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
- File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) +
- MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
- File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) +
- MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
+ private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
+ long timeout) throws IOException, TimeoutException, InterruptedException {
+ File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
long failtime = System.currentTimeMillis()
+ ((timeout > 0) ? timeout : Long.MAX_VALUE);
- while (blockFile1.exists() || blockFile2.exists()) {
+ while (blockFile != null && blockFile.exists()) {
if (failtime < System.currentTimeMillis()) {
throw new TimeoutException("waited too long for blocks to be deleted: "
- + blockFile1.getPath() + (blockFile1.exists() ? " still exists; " : " is absent; ")
- + blockFile2.getPath() + (blockFile2.exists() ? " still exists." : " is absent."));
+ + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
}
Thread.sleep(100);
+ blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
}
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java Fri Apr 29 18:16:32 2011
@@ -72,7 +72,7 @@ public class TestDatanodeConfig {
@Test
public void testDataDirectories() throws IOException {
File dataDir = new File(BASE_DIR, "data").getCanonicalFile();
- Configuration conf = cluster.getConfiguration();
+ Configuration conf = cluster.getConfiguration(0);
// 1. Test unsupported schema. Only "file:" is supported.
String dnDir = makeURI("shv", null, fileAsURI(dataDir).getPath());
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dnDir);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java Fri Apr 29 18:16:32 2011
@@ -17,14 +17,11 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.fs.Path;
import junit.framework.TestCase;
/**
@@ -41,11 +38,8 @@ public class TestDatanodeRegistration ex
public void testChangeIpcPort() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
- FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
- fs = cluster.getFileSystem();
-
InetSocketAddress addr = new InetSocketAddress(
"localhost",
cluster.getNameNodePort());
@@ -71,8 +65,7 @@ public class TestDatanodeRegistration ex
fail("Never got a heartbeat from restarted datanode.");
}
- int realIpcPort = cluster.getDataNodes().get(0)
- .dnRegistration.getIpcPort();
+ int realIpcPort = cluster.getDataNodes().get(0).getIpcPort();
// Now make sure the reported IPC port is the correct one.
assertEquals(realIpcPort, report[0].getIpcPort());
} finally {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Fri Apr 29 18:16:32 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -27,7 +26,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -117,22 +115,9 @@ public class TestDecommission {
rand.nextBytes(buffer);
stm.write(buffer);
stm.close();
+ LOG.info("Created file " + name + " with " + repl + " replicas.");
}
- private void printFileLocations(FileSystem fileSys, Path name)
- throws IOException {
- BlockLocation[] locations = fileSys.getFileBlockLocations(
- fileSys.getFileStatus(name), 0, fileSize);
- for (int idx = 0; idx < locations.length; idx++) {
- String[] loc = locations[idx].getHosts();
- StringBuilder buf = new StringBuilder("Block[" + idx + "] : ");
- for (int j = 0; j < loc.length; j++) {
- buf.append(loc[j] + " ");
- }
- LOG.info(buf.toString());
- }
- }
-
/**
* For blocks that reside on the nodes that are down, verify that their
* replication factor is 1 more than the specified one.
@@ -170,7 +155,7 @@ public class TestDecommission {
}
LOG.info("Block " + blk.getBlock() + " has " + hasdown
+ " decommissioned replica.");
- assertEquals("Number of replicas for block" + blk.getBlock(),
+ assertEquals("Number of replicas for block " + blk.getBlock(),
Math.min(numDatanodes, repl+hasdown), nodes.length);
}
}
@@ -181,22 +166,15 @@ public class TestDecommission {
assertTrue(!fileSys.exists(name));
}
- private void printDatanodeReport(DatanodeInfo[] info) {
- LOG.info("-------------------------------------------------");
- for (int i = 0; i < info.length; i++) {
- LOG.info(info[i].getDatanodeReport());
- LOG.info("");
- }
- }
-
/*
- * decommission one random node.
+ * decommission one random node and wait for each to reach the
+ * given {@code waitForState}.
*/
- private DatanodeInfo decommissionNode(FSNamesystem namesystem,
- ArrayList<String>decommissionedNodes,
+ private DatanodeInfo decommissionNode(int nnIndex,
+ ArrayList<DatanodeInfo>decommissionedNodes,
AdminStates waitForState)
throws IOException {
- DFSClient client = getDfsClient(cluster, conf);
+ DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
//
@@ -214,11 +192,16 @@ public class TestDecommission {
LOG.info("Decommissioning node: " + nodename);
// write nodename into the exclude file.
- ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
+ ArrayList<String> nodes = new ArrayList<String>();
+ if (decommissionedNodes != null) {
+ for (DatanodeInfo dn : decommissionedNodes) {
+ nodes.add(dn.getName());
+ }
+ }
nodes.add(nodename);
writeConfigFile(excludeFile, nodes);
- namesystem.refreshNodes(conf);
- DatanodeInfo ret = namesystem.getDatanode(info[index]);
+ cluster.getNamesystem(nnIndex).refreshNodes(conf);
+ DatanodeInfo ret = cluster.getNamesystem(nnIndex).getDatanode(info[index]);
waitNodeState(ret, waitForState);
return ret;
}
@@ -239,14 +222,13 @@ public class TestDecommission {
}
done = state == node.getAdminState();
}
+ LOG.info("node " + node + " reached the state " + state);
}
/* Get DFSClient to the namenode */
- private static DFSClient getDfsClient(MiniDFSCluster cluster,
+ private static DFSClient getDfsClient(NameNode nn,
Configuration conf) throws IOException {
- InetSocketAddress addr = new InetSocketAddress("localhost",
- cluster.getNameNodePort());
- return new DFSClient(addr, conf);
+ return new DFSClient(nn.getNameNodeAddress(), conf);
}
/* Validate cluster has expected number of datanodes */
@@ -258,13 +240,15 @@ public class TestDecommission {
/** Start a MiniDFSCluster
* @throws IOException */
- private void startCluster(int numDatanodes, Configuration conf)
- throws IOException {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
- .build();
+ private void startCluster(int numNameNodes, int numDatanodes,
+ Configuration conf) throws IOException {
+ cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
+ .numDataNodes(numDatanodes).build();
cluster.waitActive();
- DFSClient client = getDfsClient(cluster, conf);
- validateCluster(client, numDatanodes);
+ for (int i = 0; i < numNameNodes; i++) {
+ DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
+ validateCluster(client, numDatanodes);
+ }
}
private void verifyStats(NameNode namenode, FSNamesystem fsn,
@@ -293,90 +277,134 @@ public class TestDecommission {
}
/**
- * Tests Decommission in DFS.
+ * Tests decommission for non federated cluster
*/
@Test
public void testDecommission() throws IOException {
+ testDecommission(1, 6);
+ }
+
+ /**
+ * Test decommission for federeated cluster
+ */
+ @Test
+ public void testDecommissionFederation() throws IOException {
+ testDecommission(2, 2);
+ }
+
+ private void testDecommission(int numNamenodes, int numDatanodes)
+ throws IOException {
LOG.info("Starting test testDecommission");
- int numDatanodes = 6;
- startCluster(numDatanodes, conf);
- try {
- DFSClient client = getDfsClient(cluster, conf);
- FileSystem fileSys = cluster.getFileSystem();
- FSNamesystem fsn = cluster.getNamesystem();
- ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
- for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
- int replicas = numDatanodes - iteration - 1;
+ startCluster(numNamenodes, numDatanodes, conf);
+
+ ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
+ new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
+ for(int i = 0; i < numNamenodes; i++) {
+ namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
+ }
+ Path file1 = new Path("testDecommission.dat");
+ for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
+ int replicas = numDatanodes - iteration - 1;
+
+ // Start decommissioning one namenode at a time
+ for (int i = 0; i < numNamenodes; i++) {
+ ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
+ FileSystem fileSys = cluster.getFileSystem(i);
+ writeFile(fileSys, file1, replicas);
// Decommission one node. Verify that node is decommissioned.
- Path file1 = new Path("testDecommission.dat");
- writeFile(fileSys, file1, replicas);
- LOG.info("Created file decommission.dat with " + replicas
- + " replicas.");
- printFileLocations(fileSys, file1);
- DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
+ DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
AdminStates.DECOMMISSIONED);
- decommissionedNodes.add(downnode.getName());
+ decommissionedNodes.add(decomNode);
// Ensure decommissioned datanode is not automatically shutdown
+ DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
-
- checkFile(fileSys, file1, replicas, downnode.getName(), numDatanodes);
+ checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes);
cleanupFile(fileSys, file1);
}
-
- // Restart the cluster and ensure decommissioned datanodes
- // are allowed to register with the namenode
- cluster.shutdown();
- startCluster(numDatanodes, conf);
- } catch (IOException e) {
- DFSClient client = getDfsClient(cluster, conf);
- DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
- printDatanodeReport(info);
- throw e;
}
+
+ // Restart the cluster and ensure decommissioned datanodes
+ // are allowed to register with the namenode
+ cluster.shutdown();
+ startCluster(numNamenodes, numDatanodes, conf);
+ }
+
+ /**
+ * Tests cluster storage statistics during decommissioning for non
+ * federated cluster
+ */
+ @Test
+ public void testClusterStats() throws Exception {
+ testClusterStats(1);
}
/**
- * Tests cluster storage statistics during decommissioning
+ * Tests cluster storage statistics during decommissioning for
+ * federated cluster
*/
@Test
- public void testClusterStats() throws IOException, InterruptedException {
+ public void testClusterStatsFederation() throws Exception {
+ testClusterStats(3);
+ }
+
+ public void testClusterStats(int numNameNodes) throws IOException,
+ InterruptedException {
LOG.info("Starting test testClusterStats");
int numDatanodes = 1;
- startCluster(numDatanodes, conf);
-
- FileSystem fileSys = cluster.getFileSystem();
- Path file = new Path("testClusterStats.dat");
- writeFile(fileSys, file, 1);
+ startCluster(numNameNodes, numDatanodes, conf);
- FSNamesystem fsn = cluster.getNamesystem();
- NameNode namenode = cluster.getNameNode();
- ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
- DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
- AdminStates.DECOMMISSION_INPROGRESS);
- // Check namenode stats for multiple datanode heartbeats
- verifyStats(namenode, fsn, downnode, true);
-
- // Stop decommissioning and verify stats
- writeConfigFile(excludeFile, null);
- fsn.refreshNodes(conf);
- DatanodeInfo ret = fsn.getDatanode(downnode);
- waitNodeState(ret, AdminStates.NORMAL);
- verifyStats(namenode, fsn, ret, false);
+ for (int i = 0; i < numNameNodes; i++) {
+ FileSystem fileSys = cluster.getFileSystem(i);
+ Path file = new Path("testClusterStats.dat");
+ writeFile(fileSys, file, 1);
+
+ FSNamesystem fsn = cluster.getNamesystem(i);
+ NameNode namenode = cluster.getNameNode(i);
+ DatanodeInfo downnode = decommissionNode(i, null,
+ AdminStates.DECOMMISSION_INPROGRESS);
+ // Check namenode stats for multiple datanode heartbeats
+ verifyStats(namenode, fsn, downnode, true);
+
+ // Stop decommissioning and verify stats
+ writeConfigFile(excludeFile, null);
+ fsn.refreshNodes(conf);
+ DatanodeInfo ret = fsn.getDatanode(downnode);
+ waitNodeState(ret, AdminStates.NORMAL);
+ verifyStats(namenode, fsn, ret, false);
+ }
}
/**
- * Test host file or include file functionality. Only datanodes
- * in the include file are allowed to connect to the namenode.
+ * Test host/include file functionality. Only datanodes
+ * in the include file are allowed to connect to the namenode in a non
+ * federated cluster.
*/
@Test
public void testHostsFile() throws IOException, InterruptedException {
+ // Test for a single namenode cluster
+ testHostsFile(1);
+ }
+
+ /**
+ * Test host/include file functionality. Only datanodes
+ * in the include file are allowed to connect to the namenode in a
+ * federated cluster.
+ */
+ @Test
+ public void testHostsFileFederation() throws IOException, InterruptedException {
+ // Test for 3 namenode federated cluster
+ testHostsFile(3);
+ }
+
+ public void testHostsFile(int numNameNodes) throws IOException,
+ InterruptedException {
conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
int numDatanodes = 1;
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
- .setupHostsFile(true).build();
+ cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
+ .numDataNodes(numDatanodes).setupHostsFile(true).build();
cluster.waitActive();
// Now empty hosts file and ensure the datanode is disallowed
@@ -384,15 +412,18 @@ public class TestDecommission {
ArrayList<String>list = new ArrayList<String>();
list.add("invalidhost");
writeConfigFile(hostsFile, list);
- cluster.getNamesystem().refreshNodes(conf);
- DFSClient client = getDfsClient(cluster, conf);
- DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
- for (int i = 0 ; i < 5 && info.length != 0; i++) {
- LOG.info("Waiting for datanode to be marked dead");
- Thread.sleep(HEARTBEAT_INTERVAL * 1000);
- info = client.datanodeReport(DatanodeReportType.LIVE);
+ for (int j = 0; j < numNameNodes; j++) {
+ cluster.getNamesystem(j).refreshNodes(conf);
+
+ DFSClient client = getDfsClient(cluster.getNameNode(j), conf);
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+ for (int i = 0 ; i < 5 && info.length != 0; i++) {
+ LOG.info("Waiting for datanode to be marked dead");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ info = client.datanodeReport(DatanodeReportType.LIVE);
+ }
+ assertEquals("Number of live nodes should be 0", 0, info.length);
}
- assertEquals("Number of live nodes should be 0", 0, info.length);
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Apr 29 18:16:32 2011
@@ -426,7 +426,7 @@ public class TestDistributedFileSystem {
hdfs.setPermission(new Path(dir), new FsPermission((short)0));
try {
final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
- final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+ final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
hftp2.getFileChecksum(qualified);
fail();
} catch(IOException ioe) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Fri Apr 29 18:16:32 2011
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.HardLink;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -138,8 +138,8 @@ public class TestFileAppend{
// Create hard links for a few of the blocks
//
for (int i = 0; i < blocks.size(); i = i + 2) {
- Block b = blocks.get(i).getBlock();
- File f = dataset.getFile(b);
+ ExtendedBlock b = blocks.get(i).getBlock();
+ File f = dataset.getFile(b.getBlockPoolId(), b.getLocalBlock());
File link = new File(f.toString() + ".link");
System.out.println("Creating hardlink for File " + f + " to " + link);
HardLink.createHardLink(f, link);
@@ -149,7 +149,7 @@ public class TestFileAppend{
// Detach all blocks. This should remove hardlinks (if any)
//
for (int i = 0; i < blocks.size(); i++) {
- Block b = blocks.get(i).getBlock();
+ ExtendedBlock b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned true",
dataset.unlinkBlock(b, 1));
@@ -159,7 +159,7 @@ public class TestFileAppend{
// return false
//
for (int i = 0; i < blocks.size(); i++) {
- Block b = blocks.get(i).getBlock();
+ ExtendedBlock b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned false",
!dataset.unlinkBlock(b, 1));
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Fri Apr 29 18:16:32 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -191,7 +192,7 @@ public class TestFileAppend3 extends jun
final LocatedBlocks locatedblocks = fs.dfs.getNamenode().getBlockLocations(p.toString(), 0L, len1);
assertEquals(1, locatedblocks.locatedBlockCount());
final LocatedBlock lb = locatedblocks.get(0);
- final Block blk = lb.getBlock();
+ final ExtendedBlock blk = lb.getBlock();
assertEquals(len1, lb.getBlockSize());
DatanodeInfo[] datanodeinfos = lb.getLocations();
@@ -260,14 +261,15 @@ public class TestFileAppend3 extends jun
final int numblock = locatedblocks.locatedBlockCount();
for(int i = 0; i < numblock; i++) {
final LocatedBlock lb = locatedblocks.get(i);
- final Block blk = lb.getBlock();
+ final ExtendedBlock blk = lb.getBlock();
final long size = lb.getBlockSize();
if (i < numblock - 1) {
assertEquals(BLOCK_SIZE, size);
}
for(DatanodeInfo datanodeinfo : lb.getLocations()) {
final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
- final Block metainfo = dn.data.getStoredBlock(blk.getBlockId());
+ final Block metainfo = dn.data.getStoredBlock(blk.getBlockPoolId(),
+ blk.getBlockId());
assertEquals(size, metainfo.getNumBytes());
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java Fri Apr 29 18:16:32 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -158,7 +159,7 @@ public class TestFileAppend4 {
// Delay completeFile
DelayAnswer delayer = new DelayAnswer();
doAnswer(delayer).when(spyNN).complete(
- anyString(), anyString(), (Block)anyObject());
+ anyString(), anyString(), (ExtendedBlock)anyObject());
DFSClient client = new DFSClient(null, spyNN, conf, null);
file1 = new Path("/testRecoverFinalized");
@@ -228,7 +229,8 @@ public class TestFileAppend4 {
// Delay completeFile
DelayAnswer delayer = new DelayAnswer();
- doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), (Block)anyObject());
+ doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
+ (ExtendedBlock) anyObject());
DFSClient client = new DFSClient(null, spyNN, conf, null);
file1 = new Path("/testCompleteOtherLease");