You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 18:28:51 UTC
svn commit: r1152295 [6/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/ src/j...
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A FileOutputStream that has the property that it will only show
+ * up at its destination once it has been entirely written and flushed
+ * to disk. While being written, it will use a .tmp suffix.
+ *
+ * When the output stream is closed, it is flushed, fsynced, and
+ * will be moved into place, overwriting any file that already
+ * exists at that location.
+ *
+ * <b>NOTE</b>: on Windows platforms, it will not atomically
+ * replace the target file - instead the target file is deleted
+ * before this one is moved into place.
+ */
+public class AtomicFileOutputStream extends FilterOutputStream {
+
+ private static final String TMP_EXTENSION = ".tmp";
+
+ private final static Log LOG = LogFactory.getLog(
+ AtomicFileOutputStream.class);
+
+ private final File origFile;
+ private final File tmpFile;
+
+ public AtomicFileOutputStream(File f) throws FileNotFoundException {
+ // Code unfortunately must be duplicated below since we can't assign anything
+ // before calling super
+ super(new FileOutputStream(new File(f.getParentFile(), f.getName() + TMP_EXTENSION)));
+ origFile = f.getAbsoluteFile();
+ tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION).getAbsoluteFile();
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean triedToClose = false, success = false;
+ try {
+ flush();
+ ((FileOutputStream)out).getChannel().force(true);
+
+ triedToClose = true;
+ super.close();
+ success = true;
+ } finally {
+ if (success) {
+ boolean renamed = tmpFile.renameTo(origFile);
+ if (!renamed) {
+ // On windows, renameTo does not replace.
+ if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
+ throw new IOException("Could not rename temporary file " +
+ tmpFile + " to " + origFile);
+ }
+ }
+ } else {
+ if (!triedToClose) {
+ // If we failed when flushing, try to close it to not leak an FD
+ IOUtils.closeStream(out);
+ }
+ // close wasn't successful, try to delete the tmp file
+ if (!tmpFile.delete()) {
+ LOG.warn("Unable to delete tmp file " + tmpFile);
+ }
+ }
+ }
+ }
+
+}
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Static functions for dealing with files of the same format
+ * that the Unix "md5sum" utility writes.
+ */
+public abstract class MD5FileUtils {
+ private static final Log LOG = LogFactory.getLog(
+ MD5FileUtils.class);
+
+ private static final String MD5_SUFFIX = ".md5";
+ private static final Pattern LINE_REGEX =
+ Pattern.compile("([0-9a-f]{32}) [ \\*](.+)");
+
+ /**
+ * Verify that the previously saved md5 for the given file matches
+ * expectedMd5.
+ * @throws IOException
+ */
+ public static void verifySavedMD5(File dataFile, MD5Hash expectedMD5)
+ throws IOException {
+ MD5Hash storedHash = readStoredMd5ForFile(dataFile);
+ // Check the hash itself
+ if (!expectedMD5.equals(storedHash)) {
+ throw new IOException(
+ "File " + dataFile + " did not match stored MD5 checksum " +
+ " (stored: " + storedHash + ", computed: " + expectedMD5);
+ }
+ }
+
+ /**
+ * Read the md5 checksum stored alongside the given file, or null
+ * if no md5 is stored.
+ * @param dataFile the file containing data
+ * @return the checksum stored in dataFile.md5
+ */
+ public static MD5Hash readStoredMd5ForFile(File dataFile) throws IOException {
+ File md5File = getDigestFileForFile(dataFile);
+
+ String md5Line;
+
+ if (!md5File.exists()) {
+ return null;
+ }
+
+ BufferedReader reader =
+ new BufferedReader(new FileReader(md5File));
+ try {
+ md5Line = reader.readLine();
+ if (md5Line == null) { md5Line = ""; }
+ md5Line = md5Line.trim();
+ } catch (IOException ioe) {
+ throw new IOException("Error reading md5 file at " + md5File, ioe);
+ } finally {
+ IOUtils.cleanup(LOG, reader);
+ }
+
+ Matcher matcher = LINE_REGEX.matcher(md5Line);
+ if (!matcher.matches()) {
+ throw new IOException("Invalid MD5 file at " + md5File
+ + " (does not match expected pattern)");
+ }
+ String storedHash = matcher.group(1);
+ File referencedFile = new File(matcher.group(2));
+
+ // Sanity check: Make sure that the file referenced in the .md5 file at
+ // least has the same name as the file we expect
+ if (!referencedFile.getName().equals(dataFile.getName())) {
+ throw new IOException(
+ "MD5 file at " + md5File + " references file named " +
+ referencedFile.getName() + " but we expected it to reference " +
+ dataFile);
+ }
+ return new MD5Hash(storedHash);
+ }
+
+ /**
+ * Read dataFile and compute its MD5 checksum.
+ */
+ public static MD5Hash computeMd5ForFile(File dataFile) throws IOException {
+ InputStream in = new FileInputStream(dataFile);
+ try {
+ MessageDigest digester = MD5Hash.getDigester();
+ DigestInputStream dis = new DigestInputStream(in, digester);
+ IOUtils.copyBytes(dis, new IOUtils.NullOutputStream(), 128*1024);
+
+ return new MD5Hash(digester.digest());
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ /**
+ * Save the ".md5" file that lists the md5sum of another file.
+ * @param dataFile the original file whose md5 was computed
+ * @param digest the computed digest
+ * @throws IOException
+ */
+ public static void saveMD5File(File dataFile, MD5Hash digest)
+ throws IOException {
+ File md5File = getDigestFileForFile(dataFile);
+ String digestString = StringUtils.byteToHexString(
+ digest.getDigest());
+ String md5Line = digestString + " *" + dataFile.getName() + "\n";
+
+ AtomicFileOutputStream afos = new AtomicFileOutputStream(md5File);
+ afos.write(md5Line.getBytes());
+ afos.close();
+ LOG.debug("Saved MD5 " + digest + " to " + md5File);
+ }
+
+ /**
+ * @return a reference to the file with .md5 suffix that will
+ * contain the md5 checksum for the given data file.
+ */
+ public static File getDigestFileForFile(File file) {
+ return new File(file.getParentFile(), file.getName() + MD5_SUFFIX);
+ }
+}
Modified: hadoop/common/trunk/hdfs/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/findbugsExcludeFile.xml?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/trunk/hdfs/src/test/findbugsExcludeFile.xml Fri Jul 29 16:28:45 2011
@@ -230,6 +230,15 @@
</Match>
<!--
+ lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
+ See the comments in BackupImage for justification.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.namenode.FSImage" />
+ <Field name="lastAppliedTxId" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+ <!--
Findbugs doesn't realize that closing a FilterOutputStream pushes the close down to
wrapped streams, too.
-->
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Jul 29 16:28:45 2011
@@ -95,6 +95,7 @@ public class MiniDFSCluster {
*/
public static class Builder {
private int nameNodePort = 0;
+ private int nameNodeHttpPort = 0;
private final Configuration conf;
private int numNameNodes = 1;
private int numDataNodes = 1;
@@ -130,6 +131,14 @@ public class MiniDFSCluster {
this.nameNodePort = val;
return this;
}
+
+ /**
+ * Default: 0
+ */
+ public Builder nameNodeHttpPort(int val) {
+ this.nameNodeHttpPort = val;
+ return this;
+ }
/**
* Default: 1
@@ -247,6 +256,7 @@ public class MiniDFSCluster {
builder.federation = true;
initMiniDFSCluster(builder.nameNodePort,
+ builder.nameNodeHttpPort,
builder.conf,
builder.numDataNodes,
builder.format,
@@ -473,12 +483,13 @@ public class MiniDFSCluster {
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
- initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
+ initMiniDFSCluster(nameNodePort, 0, conf, numDataNodes, format,
manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
simulatedCapacities, null, true, false, false);
}
- private void initMiniDFSCluster(int nameNodePort, Configuration conf,
+ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
+ Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId,
@@ -526,7 +537,8 @@ public class MiniDFSCluster {
if (!federation) {
conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "127.0.0.1:" + nameNodePort);
- conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:"
+ + nameNodeHttpPort);
NameNode nn = createNameNode(0, conf, numDataNodes, manageNameDfsDirs,
format, operation, clusterId);
nameNodes[0] = new NameNodeInfo(nn, conf);
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java Fri Jul 29 16:28:45 2011
@@ -18,14 +18,18 @@
package org.apache.hadoop.hdfs;
import java.io.File;
-import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.NAME_NODE;
import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.DATA_NODE;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
+
+import com.google.common.collect.Lists;
/**
* This test ensures the appropriate response from the system when
@@ -57,14 +61,17 @@ public class TestDFSFinalize extends Tes
* because its removal is asynchronous therefore we have no reliable
* way to know when it will happen.
*/
- void checkResult(String[] nameNodeDirs, String[] dataNodeDirs) throws IOException {
+ static void checkResult(String[] nameNodeDirs, String[] dataNodeDirs) throws Exception {
+ List<File> dirs = Lists.newArrayList();
for (int i = 0; i < nameNodeDirs.length; i++) {
- assertTrue(new File(nameNodeDirs[i],"current").isDirectory());
- assertTrue(new File(nameNodeDirs[i],"current/VERSION").isFile());
- assertTrue(new File(nameNodeDirs[i],"current/edits").isFile());
- assertTrue(new File(nameNodeDirs[i],"current/fsimage").isFile());
- assertTrue(new File(nameNodeDirs[i],"current/fstime").isFile());
+ File curDir = new File(nameNodeDirs[i], "current");
+ dirs.add(curDir);
+ FSImageTestUtil.assertReasonableNameCurrentDir(curDir);
}
+
+ FSImageTestUtil.assertParallelFilesAreIdentical(
+ dirs, Collections.<String>emptySet());
+
for (int i = 0; i < dataNodeDirs.length; i++) {
assertEquals(
UpgradeUtilities.checksumContents(
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java Fri Jul 29 16:28:45 2011
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.ser
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import junit.framework.TestCase;
@@ -32,8 +34,11 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.collect.Lists;
+
/**
* This test ensures the appropriate response (successful or failure) from
* the system when the system is rolled back under various storage state and
@@ -61,26 +66,26 @@ public class TestDFSRollback extends Tes
* Verify that the new current directory is the old previous.
* It is assumed that the server has recovered and rolled back.
*/
- void checkResult(NodeType nodeType, String[] baseDirs) throws IOException {
- switch (nodeType) {
- case NAME_NODE:
- for (int i = 0; i < baseDirs.length; i++) {
- assertTrue(new File(baseDirs[i],"current").isDirectory());
- assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
- assertTrue(new File(baseDirs[i],"current/edits").isFile());
- assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
- assertTrue(new File(baseDirs[i],"current/fstime").isFile());
- }
- break;
- case DATA_NODE:
- for (int i = 0; i < baseDirs.length; i++) {
+ void checkResult(NodeType nodeType, String[] baseDirs) throws Exception {
+ List<File> curDirs = Lists.newArrayList();
+ for (String baseDir : baseDirs) {
+ File curDir = new File(baseDir, "current");
+ curDirs.add(curDir);
+ switch (nodeType) {
+ case NAME_NODE:
+ FSImageTestUtil.assertReasonableNameCurrentDir(curDir);
+ break;
+ case DATA_NODE:
assertEquals(
- UpgradeUtilities.checksumContents(
- nodeType, new File(baseDirs[i],"current")),
- UpgradeUtilities.checksumMasterDataNodeContents());
+ UpgradeUtilities.checksumContents(nodeType, curDir),
+ UpgradeUtilities.checksumMasterDataNodeContents());
+ break;
}
- break;
}
+
+ FSImageTestUtil.assertParallelFilesAreIdentical(
+ curDirs, Collections.<String>emptySet());
+
for (int i = 0; i < baseDirs.length; i++) {
assertFalse(new File(baseDirs[i],"previous").isDirectory());
}
@@ -241,21 +246,17 @@ public class TestDFSRollback extends Tes
log("NameNode rollback with no edits file", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
- for (File f : baseDirs) {
- FileUtil.fullyDelete(new File(f,"edits"));
- }
+ deleteMatchingFiles(baseDirs, "edits.*");
startNameNodeShouldFail(StartupOption.ROLLBACK,
- "Edits file is not found");
+ "but there are no logs to load");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with no image file", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
- for (File f : baseDirs) {
- FileUtil.fullyDelete(new File(f,"fsimage"));
- }
+ deleteMatchingFiles(baseDirs, "fsimage_.*");
startNameNodeShouldFail(StartupOption.ROLLBACK,
- "Image file is not found");
+ "No valid image files found");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with corrupt version file", numDirs);
@@ -284,6 +285,16 @@ public class TestDFSRollback extends Tes
} // end numDir loop
}
+ private void deleteMatchingFiles(File[] baseDirs, String regex) {
+ for (File baseDir : baseDirs) {
+ for (File f : baseDir.listFiles()) {
+ if (f.getName().matches(regex)) {
+ f.delete();
+ }
+ }
+ }
+ }
+
protected void tearDown() throws Exception {
LOG.info("Shutting down MiniDFSCluster");
if (cluster != null) cluster.shutdown();
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java Fri Jul 29 16:28:45 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.NAME_NODE;
import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.DATA_NODE;
@@ -42,47 +43,54 @@ public class TestDFSStorageStateRecovery
private int testCounter = 0;
private MiniDFSCluster cluster = null;
+ // Constants for indexes into test case table below.
+ private static final int CURRENT_EXISTS = 0;
+ private static final int PREVIOUS_EXISTS = 1;
+ private static final int PREVIOUS_TMP_EXISTS = 2;
+ private static final int REMOVED_TMP_EXISTS = 3;
+ private static final int SHOULD_RECOVER = 4;
+ private static final int CURRENT_SHOULD_EXIST_AFTER_RECOVER = 5;
+ private static final int PREVIOUS_SHOULD_EXIST_AFTER_RECOVER = 6;
+
/**
* The test case table. Each row represents a test case. This table is
* taken from the table in Apendix A of the HDFS Upgrade Test Plan
* (TestPlan-HdfsUpgrade.html) attached to
* http://issues.apache.org/jira/browse/HADOOP-702
+ *
+ * It has been slightly modified since previouscheckpoint.tmp no longer
+ * exists.
+ *
* The column meanings are:
* 0) current directory exists
* 1) previous directory exists
* 2) previous.tmp directory exists
* 3) removed.tmp directory exists
- * 4) lastcheckpoint.tmp directory exists
- * 5) node should recover and startup
- * 6) current directory should exist after recovery but before startup
- * 7) previous directory should exist after recovery but before startup
+ * 4) node should recover and startup
+ * 5) current directory should exist after recovery but before startup
+ * 6) previous directory should exist after recovery but before startup
*/
static boolean[][] testCases = new boolean[][] {
- new boolean[] {true, false, false, false, false, true, true, false}, // 1
- new boolean[] {true, true, false, false, false, true, true, true }, // 2
- new boolean[] {true, false, true, false, false, true, true, true }, // 3
- new boolean[] {true, true, true, true, false, false, false, false}, // 4
- new boolean[] {true, true, true, false, false, false, false, false}, // 4
- new boolean[] {false, true, true, true, false, false, false, false}, // 4
- new boolean[] {false, true, true, false, false, false, false, false}, // 4
- new boolean[] {false, false, false, false, false, false, false, false}, // 5
- new boolean[] {false, true, false, false, false, false, false, false}, // 6
- new boolean[] {false, false, true, false, false, true, true, false}, // 7
- new boolean[] {true, false, false, true, false, true, true, false}, // 8
- new boolean[] {true, true, false, true, false, false, false, false}, // 9
- new boolean[] {true, true, true, true, false, false, false, false}, // 10
- new boolean[] {true, false, true, true, false, false, false, false}, // 10
- new boolean[] {false, true, true, true, false, false, false, false}, // 10
- new boolean[] {false, false, true, true, false, false, false, false}, // 10
- new boolean[] {false, false, false, true, false, false, false, false}, // 11
- new boolean[] {false, true, false, true, false, true, true, true }, // 12
+ new boolean[] {true, false, false, false, true, true, false}, // 1
+ new boolean[] {true, true, false, false, true, true, true }, // 2
+ new boolean[] {true, false, true, false, true, true, true }, // 3
+ new boolean[] {true, true, true, true, false, false, false}, // 4
+ new boolean[] {true, true, true, false, false, false, false}, // 4
+ new boolean[] {false, true, true, true, false, false, false}, // 4
+ new boolean[] {false, true, true, false, false, false, false}, // 4
+ new boolean[] {false, false, false, false, false, false, false}, // 5
+ new boolean[] {false, true, false, false, false, false, false}, // 6
+ new boolean[] {false, false, true, false, true, true, false}, // 7
+ new boolean[] {true, false, false, true, true, true, false}, // 8
+ new boolean[] {true, true, false, true, false, false, false}, // 9
+ new boolean[] {true, true, true, true, false, false, false}, // 10
+ new boolean[] {true, false, true, true, false, false, false}, // 10
+ new boolean[] {false, true, true, true, false, false, false}, // 10
+ new boolean[] {false, false, true, true, false, false, false}, // 10
+ new boolean[] {false, false, false, true, false, false, false}, // 11
+ new boolean[] {false, true, false, true, true, true, true }, // 12
// name-node specific cases
- new boolean[] {true, false, false, false, true, true, true, false}, // 13
- new boolean[] {true, true, false, false, true, true, true, false}, // 13
- new boolean[] {false, false, false, false, true, true, true, false}, // 14
- new boolean[] {false, true, false, false, true, true, true, false}, // 14
- new boolean[] {true, false, true, false, true, false, false, false}, // 15
- new boolean[] {true, true, false, true, true, false, false, false} // 16
+ new boolean[] {true, true, false, false, true, true, false}, // 13
};
private static final int NUM_NN_TEST_CASES = testCases.length;
@@ -98,14 +106,13 @@ public class TestDFSStorageStateRecovery
+ label + ":"
+ " numDirs="+numDirs
+ " testCase="+testCaseNum
- + " current="+state[0]
- + " previous="+state[1]
- + " previous.tmp="+state[2]
- + " removed.tmp="+state[3]
- + " lastcheckpoint.tmp="+state[4]
- + " should recover="+state[5]
- + " current exists after="+state[6]
- + " previous exists after="+state[7]);
+ + " current="+state[CURRENT_EXISTS]
+ + " previous="+state[PREVIOUS_EXISTS]
+ + " previous.tmp="+state[PREVIOUS_TMP_EXISTS]
+ + " removed.tmp="+state[REMOVED_TMP_EXISTS]
+ + " should recover="+state[SHOULD_RECOVER]
+ + " current exists after="+state[CURRENT_SHOULD_EXIST_AFTER_RECOVER]
+ + " previous exists after="+state[PREVIOUS_SHOULD_EXIST_AFTER_RECOVER]);
}
/**
@@ -125,16 +132,15 @@ public class TestDFSStorageStateRecovery
String[] createNameNodeStorageState(boolean[] state) throws Exception {
String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
UpgradeUtilities.createEmptyDirs(baseDirs);
- if (state[0]) // current
+ if (state[CURRENT_EXISTS]) // current
UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "current");
- if (state[1]) // previous
+ if (state[PREVIOUS_EXISTS]) // previous
UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "previous");
- if (state[2]) // previous.tmp
+ if (state[PREVIOUS_TMP_EXISTS]) // previous.tmp
UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "previous.tmp");
- if (state[3]) // removed.tmp
+ if (state[REMOVED_TMP_EXISTS]) // removed.tmp
UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "removed.tmp");
- if (state[4]) // lastcheckpoint.tmp
- UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "lastcheckpoint.tmp");
+
return baseDirs;
}
@@ -154,16 +160,15 @@ public class TestDFSStorageStateRecovery
String[] createDataNodeStorageState(boolean[] state) throws Exception {
String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
UpgradeUtilities.createEmptyDirs(baseDirs);
- if (state[0]) // current
+ if (state[CURRENT_EXISTS]) // current
UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "current");
- if (state[1]) // previous
+ if (state[PREVIOUS_EXISTS]) // previous
UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "previous");
- if (state[2]) // previous.tmp
+ if (state[PREVIOUS_TMP_EXISTS]) // previous.tmp
UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "previous.tmp");
- if (state[3]) // removed.tmp
+ if (state[REMOVED_TMP_EXISTS]) // removed.tmp
UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "removed.tmp");
- if (state[4]) // lastcheckpoint.tmp
- UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "lastcheckpoint.tmp");
+
return baseDirs;
}
@@ -189,19 +194,16 @@ public class TestDFSStorageStateRecovery
// After copying the storage directories from master datanode, empty
// the block pool storage directories
String[] bpDirs = UpgradeUtilities.createEmptyBPDirs(baseDirs, bpid);
- if (state[0]) // current
+ if (state[CURRENT_EXISTS]) // current
UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "current", bpid);
- if (state[1]) // previous
+ if (state[PREVIOUS_EXISTS]) // previous
UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "previous", bpid);
- if (state[2]) // previous.tmp
+ if (state[PREVIOUS_TMP_EXISTS]) // previous.tmp
UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "previous.tmp",
bpid);
- if (state[3]) // removed.tmp
+ if (state[REMOVED_TMP_EXISTS]) // removed.tmp
UpgradeUtilities
.createBlockPoolStorageDirs(baseDirs, "removed.tmp", bpid);
- if (state[4]) // lastcheckpoint.tmp
- UpgradeUtilities.createBlockPoolStorageDirs(baseDirs,
- "lastcheckpoint.tmp", bpid);
return bpDirs;
}
@@ -220,9 +222,9 @@ public class TestDFSStorageStateRecovery
for (int i = 0; i < baseDirs.length; i++) {
assertTrue(new File(baseDirs[i],"current").isDirectory());
assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
- assertTrue(new File(baseDirs[i],"current/edits").isFile());
- assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
- assertTrue(new File(baseDirs[i],"current/fstime").isFile());
+ assertNotNull(FSImageTestUtil.findNewestImageFile(
+ baseDirs[i] + "/current"));
+ assertTrue(new File(baseDirs[i],"current/seen_txid").isFile());
}
}
if (previousShouldExist) {
@@ -318,9 +320,9 @@ public class TestDFSStorageStateRecovery
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
for (int i = 0; i < NUM_NN_TEST_CASES; i++) {
boolean[] testCase = testCases[i];
- boolean shouldRecover = testCase[5];
- boolean curAfterRecover = testCase[6];
- boolean prevAfterRecover = testCase[7];
+ boolean shouldRecover = testCase[SHOULD_RECOVER];
+ boolean curAfterRecover = testCase[CURRENT_SHOULD_EXIST_AFTER_RECOVER];
+ boolean prevAfterRecover = testCase[PREVIOUS_SHOULD_EXIST_AFTER_RECOVER];
log("NAME_NODE recovery", numDirs, i, testCase);
baseDirs = createNameNodeStorageState(testCase);
@@ -336,8 +338,8 @@ public class TestDFSStorageStateRecovery
// the exception is expected
// check that the message says "not formatted"
// when storage directory is empty (case #5)
- if(!testCases[i][0] && !testCases[i][2]
- && !testCases[i][1] && !testCases[i][3] && !testCases[i][4]) {
+ if(!testCases[i][CURRENT_EXISTS] && !testCases[i][PREVIOUS_TMP_EXISTS]
+ && !testCases[i][PREVIOUS_EXISTS] && !testCases[i][REMOVED_TMP_EXISTS]) {
assertTrue(expected.getLocalizedMessage().contains(
"NameNode is not formatted"));
}
@@ -362,16 +364,16 @@ public class TestDFSStorageStateRecovery
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
for (int i = 0; i < NUM_DN_TEST_CASES; i++) {
boolean[] testCase = testCases[i];
- boolean shouldRecover = testCase[5];
- boolean curAfterRecover = testCase[6];
- boolean prevAfterRecover = testCase[7];
+ boolean shouldRecover = testCase[SHOULD_RECOVER];
+ boolean curAfterRecover = testCase[CURRENT_SHOULD_EXIST_AFTER_RECOVER];
+ boolean prevAfterRecover = testCase[PREVIOUS_SHOULD_EXIST_AFTER_RECOVER];
log("DATA_NODE recovery", numDirs, i, testCase);
createNameNodeStorageState(new boolean[] { true, true, false, false,
false });
cluster = createCluster(conf);
baseDirs = createDataNodeStorageState(testCase);
- if (!testCase[0] && !testCase[1] && !testCase[2] && !testCase[3]) {
+ if (!testCase[CURRENT_EXISTS] && !testCase[PREVIOUS_EXISTS] && !testCase[PREVIOUS_TMP_EXISTS] && !testCase[REMOVED_TMP_EXISTS]) {
// DataNode will create and format current if no directories exist
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
} else {
@@ -403,16 +405,16 @@ public class TestDFSStorageStateRecovery
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
for (int i = 0; i < NUM_DN_TEST_CASES; i++) {
boolean[] testCase = testCases[i];
- boolean shouldRecover = testCase[5];
- boolean curAfterRecover = testCase[6];
- boolean prevAfterRecover = testCase[7];
+ boolean shouldRecover = testCase[SHOULD_RECOVER];
+ boolean curAfterRecover = testCase[CURRENT_SHOULD_EXIST_AFTER_RECOVER];
+ boolean prevAfterRecover = testCase[PREVIOUS_SHOULD_EXIST_AFTER_RECOVER];
log("BLOCK_POOL recovery", numDirs, i, testCase);
createNameNodeStorageState(new boolean[] { true, true, false, false,
false });
cluster = createCluster(conf);
baseDirs = createBlockPoolStorageState(bpid, testCase);
- if (!testCase[0] && !testCase[1] && !testCase[2] && !testCase[3]) {
+ if (!testCase[CURRENT_EXISTS] && !testCase[PREVIOUS_EXISTS] && !testCase[PREVIOUS_TMP_EXISTS] && !testCase[REMOVED_TMP_EXISTS]) {
// DataNode will create and format current if no directories exist
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
} else {
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Fri Jul 29 16:28:45 2011
@@ -27,15 +27,20 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
+
import org.apache.hadoop.util.StringUtils;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
import static org.junit.Assert.*;
/**
@@ -45,6 +50,7 @@ import static org.junit.Assert.*;
*/
public class TestDFSUpgrade {
+ private static final int EXPECTED_TXID = 17;
private static final Log LOG = LogFactory.getLog(TestDFSUpgrade.class.getName());
private Configuration conf;
private int testCounter = 0;
@@ -66,15 +72,22 @@ public class TestDFSUpgrade {
* its files with their original checksum. It is assumed that the
* server has recovered and upgraded.
*/
- void checkNameNode(String[] baseDirs) throws IOException {
- for (int i = 0; i < baseDirs.length; i++) {
- assertTrue(new File(baseDirs[i],"current").isDirectory());
- assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
- assertTrue(new File(baseDirs[i],"current/edits").isFile());
- assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
- assertTrue(new File(baseDirs[i],"current/fstime").isFile());
+ void checkNameNode(String[] baseDirs, long imageTxId) throws IOException {
+ for (String baseDir : baseDirs) {
+ LOG.info("Checking namenode directory " + baseDir);
+ LOG.info("==== Contents ====:\n " +
+ Joiner.on(" \n").join(new File(baseDir, "current").list()));
+ LOG.info("==================");
+
+ assertTrue(new File(baseDir,"current").isDirectory());
+ assertTrue(new File(baseDir,"current/VERSION").isFile());
+ assertTrue(new File(baseDir,"current/"
+ + getInProgressEditsFileName(imageTxId + 1)).isFile());
+ assertTrue(new File(baseDir,"current/"
+ + getImageFileName(imageTxId)).isFile());
+ assertTrue(new File(baseDir,"current/seen_txid").isFile());
- File previous = new File(baseDirs[i], "previous");
+ File previous = new File(baseDir, "previous");
assertTrue(previous.isDirectory());
assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous),
UpgradeUtilities.checksumMasterNameNodeContents());
@@ -200,7 +213,6 @@ public class TestDFSUpgrade {
StorageInfo storageInfo = null;
for (int numDirs = 1; numDirs <= 2; numDirs++) {
conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
@@ -208,7 +220,7 @@ public class TestDFSUpgrade {
log("Normal NameNode upgrade", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- checkNameNode(nameNodeDirs);
+ checkNameNode(nameNodeDirs, EXPECTED_TXID);
if (numDirs > 1)
TestParallelImageWrite.checkImages(cluster.getNamesystem(), numDirs);
cluster.shutdown();
@@ -277,25 +289,21 @@ public class TestDFSUpgrade {
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("NameNode upgrade with no edits file", numDirs);
- baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
- for (File f : baseDirs) {
- FileUtil.fullyDelete(new File(f,"edits"));
- }
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ deleteStorageFilesWithPrefix(nameNodeDirs, "edits_");
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode upgrade with no image file", numDirs);
- baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
- for (File f : baseDirs) {
- FileUtil.fullyDelete(new File(f,"fsimage"));
- }
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ deleteStorageFilesWithPrefix(nameNodeDirs, "fsimage_");
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode upgrade with corrupt version file", numDirs);
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
for (File f : baseDirs) {
- UpgradeUtilities.corruptFile(new File(f,"VERSION"));
+ UpgradeUtilities.corruptFile(new File (f,"VERSION"));
}
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
@@ -338,7 +346,7 @@ public class TestDFSUpgrade {
log("Normal NameNode upgrade", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = createCluster();
- checkNameNode(nameNodeDirs);
+ checkNameNode(nameNodeDirs, EXPECTED_TXID);
TestParallelImageWrite.checkImages(cluster.getNamesystem(), numDirs);
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
@@ -369,6 +377,19 @@ public class TestDFSUpgrade {
}
}
+ private void deleteStorageFilesWithPrefix(String[] nameNodeDirs, String prefix)
+ throws Exception {
+ for (String baseDirStr : nameNodeDirs) {
+ File baseDir = new File(baseDirStr);
+ File currentDir = new File(baseDir, "current");
+ for (File f : currentDir.listFiles()) {
+ if (f.getName().startsWith(prefix)) {
+ assertTrue("Deleting " + f, f.delete());
+ }
+ }
+ }
+ }
+
@Test(expected=IOException.class)
public void testUpgradeFromPreUpgradeLVFails() throws IOException {
// Upgrade from versions prior to Storage#LAST_UPGRADABLE_LAYOUT_VERSION
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java Fri Jul 29 16:28:45 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.Log;
@@ -183,38 +184,6 @@ public class TestDFSUpgradeFromImage ext
}
}
- public void testUpgradeFromRel14Image() throws IOException {
- unpackStorage();
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new HdfsConfiguration();
- if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Ant
- System.setProperty("test.build.data", "build/test/data");
- }
- conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // block scanning off
- cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numDataNodes)
- .format(false)
- .startupOption(StartupOption.UPGRADE)
- .clusterId("testClusterId")
- .build();
- cluster.waitActive();
- DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- DFSClient dfsClient = dfs.dfs;
- //Safemode will be off only after upgrade is complete. Wait for it.
- while ( dfsClient.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET) ) {
- LOG.info("Waiting for SafeMode to be OFF.");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {}
- }
-
- verifyFileSystem(dfs);
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
- }
-
/**
* Test that sets up a fake image from Hadoop 0.3.0 and tries to start a
* NN, verifying that the correct error message is thrown.
@@ -260,10 +229,50 @@ public class TestDFSUpgradeFromImage ext
}
/**
+ * Test upgrade from an 0.14 image
+ */
+ public void testUpgradeFromRel14Image() throws IOException {
+ unpackStorage();
+ upgradeAndVerify();
+ }
+
+ /**
* Test upgrade from 0.22 image
*/
public void testUpgradeFromRel22Image() throws IOException {
unpackStorage(HADOOP22_IMAGE);
+ upgradeAndVerify();
+ }
+
+ /**
+ * Test upgrade from 0.22 image with corrupt md5, make sure it
+ * fails to upgrade
+ */
+ public void testUpgradeFromCorruptRel22Image() throws IOException {
+ unpackStorage(HADOOP22_IMAGE);
+
+ // Overwrite the md5 stored in the VERSION files
+ File baseDir = new File(MiniDFSCluster.getBaseDirectory());
+ FSImageTestUtil.corruptVersionFile(
+ new File(baseDir, "name1/current/VERSION"),
+ "imageMD5Digest", "22222222222222222222222222222222");
+ FSImageTestUtil.corruptVersionFile(
+ new File(baseDir, "name2/current/VERSION"),
+ "imageMD5Digest", "22222222222222222222222222222222");
+
+ // Upgrade should now fail
+ try {
+ upgradeAndVerify();
+ fail("Upgrade did not fail with bad MD5");
+ } catch (IOException ioe) {
+ String msg = StringUtils.stringifyException(ioe);
+ if (!msg.contains("is corrupt with MD5 checksum")) {
+ throw ioe;
+ }
+ }
+ }
+
+ private void upgradeAndVerify() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new HdfsConfiguration();
@@ -287,8 +296,12 @@ public class TestDFSUpgradeFromImage ext
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
+
+ verifyFileSystem(dfs);
} finally {
if (cluster != null) { cluster.shutdown(); }
- }
+ }
}
+
+
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java Fri Jul 29 16:28:45 2011
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@@ -45,11 +44,10 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
* using append()/sync() to recover block information
@@ -157,7 +155,7 @@ public class TestFileAppend4 {
NameNode spyNN = spy(preSpyNN);
// Delay completeFile
- DelayAnswer delayer = new DelayAnswer();
+ GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(
anyString(), anyString(), (ExtendedBlock)anyObject());
@@ -228,7 +226,8 @@ public class TestFileAppend4 {
NameNode spyNN = spy(preSpyNN);
// Delay completeFile
- DelayAnswer delayer = new DelayAnswer();
+ GenericTestUtils.DelayAnswer delayer =
+ new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
(ExtendedBlock) anyObject());
@@ -290,42 +289,5 @@ public class TestFileAppend4 {
} finally {
cluster.shutdown();
}
- }
-
- /**
- * Mockito answer helper that triggers one latch as soon as the
- * method is called, then waits on another before continuing.
- */
- private static class DelayAnswer implements Answer<Object> {
- private final CountDownLatch fireLatch = new CountDownLatch(1);
- private final CountDownLatch waitLatch = new CountDownLatch(1);
-
- /**
- * Wait until the method is called.
- */
- public void waitForCall() throws InterruptedException {
- fireLatch.await();
- }
-
- /**
- * Tell the method to proceed.
- * This should only be called after waitForCall()
- */
- public void proceed() {
- waitLatch.countDown();
- }
-
- public Object answer(InvocationOnMock invocation) throws Throwable {
- LOG.info("DelayAnswer firing fireLatch");
- fireLatch.countDown();
- try {
- LOG.info("DelayAnswer waiting on waitLatch");
- waitLatch.await();
- LOG.info("DelayAnswer delay complete");
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted waiting on latch", ie);
- }
- return invocation.callRealMethod();
- }
}
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java Fri Jul 29 16:28:45 2011
@@ -25,8 +25,10 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
+import java.net.URI;
import java.util.Arrays;
import java.util.Random;
+import java.util.Collections;
import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -407,7 +409,9 @@ public class UpgradeUtilities {
*/
public static File[] createNameNodeVersionFile(Configuration conf,
File[] parent, StorageInfo version, String bpid) throws IOException {
- Storage storage = new NNStorage(conf);
+ Storage storage = new NNStorage(conf,
+ Collections.<URI>emptyList(),
+ Collections.<URI>emptyList());
storage.setStorageInfo(version);
File[] versionFiles = new File[parent.length];
for (int i = 0; i < parent.length; i++) {
Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/StorageAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/StorageAdapter.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/StorageAdapter.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/StorageAdapter.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.mockito.Mockito;
+
+/**
+ * Test methods that need to access package-private parts of
+ * Storage
+ */
+public abstract class StorageAdapter {
+
+ /**
+ * Inject and return a spy on a storage directory
+ */
+ public static StorageDirectory spyOnStorageDirectory(
+ Storage s, int idx) {
+
+ StorageDirectory dir = Mockito.spy(s.getStorageDir(idx));
+ s.storageDirs.set(idx, dir);
+ return dir;
+ }
+}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java Fri Jul 29 16:28:45 2011
@@ -190,11 +190,9 @@ public class CreateEditsLog {
}
}
- FSImage fsImage = new FSImage(editsLogDir.getAbsoluteFile().toURI());
- FileNameGenerator nameGenerator = new FileNameGenerator(BASE_PATH, 100);
- FSEditLog editLog = fsImage.getEditLog();
- editLog.createEditLogFile(fsImage.getStorage().getFsEditName());
+ FileNameGenerator nameGenerator = new FileNameGenerator(BASE_PATH, 100);
+ FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir);
editLog.open();
addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId,
nameGenerator);
Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.mockito.Mockito;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Utility functions for testing fsimage storage.
+ */
+public abstract class FSImageTestUtil {
+
+ /**
+ * The position in the fsimage header where the txid is
+ * written.
+ */
+ private static final long IMAGE_TXID_POS = 24;
+
+ /**
+ * This function returns a md5 hash of a file.
+ *
+ * @param file input file
+ * @return The md5 string
+ */
+ public static String getFileMD5(File file) throws IOException {
+ return MD5FileUtils.computeMd5ForFile(file).toString();
+ }
+
+ /**
+ * Calculate the md5sum of an image after zeroing out the transaction ID
+ * field in the header. This is useful for tests that want to verify
+ * that two checkpoints have identical namespaces.
+ */
+ public static String getImageFileMD5IgnoringTxId(File imageFile)
+ throws IOException {
+ File tmpFile = File.createTempFile("hadoop_imagefile_tmp", "fsimage");
+ tmpFile.deleteOnExit();
+ try {
+ Files.copy(imageFile, tmpFile);
+ RandomAccessFile raf = new RandomAccessFile(tmpFile, "rw");
+ try {
+ raf.seek(IMAGE_TXID_POS);
+ raf.writeLong(0);
+ } finally {
+ IOUtils.closeStream(raf);
+ }
+ return getFileMD5(tmpFile);
+ } finally {
+ tmpFile.delete();
+ }
+ }
+
+ public static StorageDirectory mockStorageDirectory(
+ File currentDir, NameNodeDirType type) {
+ // Mock the StorageDirectory interface to just point to this file
+ StorageDirectory sd = Mockito.mock(StorageDirectory.class);
+ Mockito.doReturn(type)
+ .when(sd).getStorageDirType();
+ Mockito.doReturn(currentDir).when(sd).getCurrentDir();
+
+ Mockito.doReturn(mockFile(true)).when(sd).getVersionFile();
+ Mockito.doReturn(mockFile(false)).when(sd).getPreviousDir();
+ return sd;
+ }
+
+ static File mockFile(boolean exists) {
+ File mockFile = mock(File.class);
+ doReturn(exists).when(mockFile).exists();
+ return mockFile;
+ }
+
+ public static FSImageTransactionalStorageInspector inspectStorageDirectory(
+ File dir, NameNodeDirType dirType) throws IOException {
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+ inspector.inspectDirectory(mockStorageDirectory(dir, dirType));
+ return inspector;
+ }
+
+
+ /**
+ * Return a standalone instance of FSEditLog that will log into the given
+ * log directory. The returned instance is not yet opened.
+ */
+ public static FSEditLog createStandaloneEditLog(File logDir)
+ throws IOException {
+ assertTrue(logDir.mkdirs() || logDir.exists());
+ Files.deleteDirectoryContents(logDir);
+ NNStorage storage = Mockito.mock(NNStorage.class);
+ List<StorageDirectory> sds = Lists.newArrayList(
+ FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS));
+ Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
+
+ return new FSEditLog(storage);
+ }
+
+ /**
+ * Assert that all of the given directories have the same newest filename
+ * for fsimage that they hold the same data.
+ */
+ public static void assertSameNewestImage(List<File> dirs) throws Exception {
+ if (dirs.size() < 2) return;
+
+ long imageTxId = -1;
+
+ List<File> imageFiles = new ArrayList<File>();
+ for (File dir : dirs) {
+ FSImageTransactionalStorageInspector inspector =
+ inspectStorageDirectory(dir, NameNodeDirType.IMAGE);
+ FoundFSImage latestImage = inspector.getLatestImage();
+ assertNotNull("No image in " + dir, latestImage);
+ long thisTxId = latestImage.getTxId();
+ if (imageTxId != -1 && thisTxId != imageTxId) {
+ fail("Storage directory " + dir + " does not have the same " +
+ "last image index " + imageTxId + " as another");
+ }
+ imageTxId = thisTxId;
+ imageFiles.add(inspector.getLatestImage().getFile());
+ }
+
+ assertFileContentsSame(imageFiles.toArray(new File[0]));
+ }
+
+ /**
+ * Given a list of directories, assert that any files that are named
+ * the same thing have the same contents. For example, if a file
+ * named "fsimage_1" shows up in more than one directory, then it must
+ * be the same.
+ * @throws Exception
+ */
+ public static void assertParallelFilesAreIdentical(List<File> dirs,
+ Set<String> ignoredFileNames) throws Exception {
+ HashMap<String, List<File>> groupedByName = new HashMap<String, List<File>>();
+ for (File dir : dirs) {
+ for (File f : dir.listFiles()) {
+ if (ignoredFileNames.contains(f.getName())) {
+ continue;
+ }
+
+ List<File> fileList = groupedByName.get(f.getName());
+ if (fileList == null) {
+ fileList = new ArrayList<File>();
+ groupedByName.put(f.getName(), fileList);
+ }
+ fileList.add(f);
+ }
+ }
+
+ for (List<File> sameNameList : groupedByName.values()) {
+ if (sameNameList.get(0).isDirectory()) {
+ // recurse
+ assertParallelFilesAreIdentical(sameNameList, ignoredFileNames);
+ } else {
+ assertFileContentsSame(sameNameList.toArray(new File[0]));
+ }
+ }
+ }
+
+ /**
+ * Assert that all of the given paths have the exact same
+ * contents
+ */
+ public static void assertFileContentsSame(File... files) throws Exception {
+ if (files.length < 2) return;
+
+ Map<File, String> md5s = getFileMD5s(files);
+ if (Sets.newHashSet(md5s.values()).size() > 1) {
+ fail("File contents differed:\n " +
+ Joiner.on("\n ")
+ .withKeyValueSeparator("=")
+ .join(md5s));
+ }
+ }
+
+ /**
+ * Assert that the given files are not all the same, and in fact that
+ * they have <code>expectedUniqueHashes</code> unique contents.
+ */
+ public static void assertFileContentsDifferent(
+ int expectedUniqueHashes,
+ File... files) throws Exception
+ {
+ Map<File, String> md5s = getFileMD5s(files);
+ if (Sets.newHashSet(md5s.values()).size() != expectedUniqueHashes) {
+ fail("Expected " + expectedUniqueHashes + " different hashes, got:\n " +
+ Joiner.on("\n ")
+ .withKeyValueSeparator("=")
+ .join(md5s));
+ }
+ }
+
+ public static Map<File, String> getFileMD5s(File... files) throws Exception {
+ Map<File, String> ret = Maps.newHashMap();
+ for (File f : files) {
+ assertTrue("Must exist: " + f, f.exists());
+ ret.put(f, getFileMD5(f));
+ }
+ return ret;
+ }
+
+ /**
+ * @return a List which contains the "current" dir for each storage
+ * directory of the given type.
+ */
+ public static List<File> getCurrentDirs(NNStorage storage,
+ NameNodeDirType type) {
+ List<File> ret = Lists.newArrayList();
+ for (StorageDirectory sd : storage.dirIterable(type)) {
+ ret.add(sd.getCurrentDir());
+ }
+ return ret;
+ }
+
+ /**
+ * @return the fsimage file with the most recent transaction ID in the
+ * given storage directory.
+ */
+ public static File findLatestImageFile(StorageDirectory sd)
+ throws IOException {
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+ inspector.inspectDirectory(sd);
+
+ return inspector.getLatestImage().getFile();
+ }
+
+ /**
+ * @return the fsimage file with the most recent transaction ID in the
+ * given 'current/' directory.
+ */
+ public static File findNewestImageFile(String currentDirPath) throws IOException {
+ StorageDirectory sd = FSImageTestUtil.mockStorageDirectory(
+ new File(currentDirPath), NameNodeDirType.IMAGE);
+
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+ inspector.inspectDirectory(sd);
+
+ FoundFSImage latestImage = inspector.getLatestImage();
+ return (latestImage == null) ? null : latestImage.getFile();
+ }
+
+ /**
+ * Assert that the NameNode has checkpoints at the expected
+ * transaction IDs.
+ */
+ static void assertNNHasCheckpoints(MiniDFSCluster cluster,
+ List<Integer> txids) {
+
+ for (File nameDir : getNameNodeCurrentDirs(cluster)) {
+ // Should have fsimage_N for the three checkpoints
+ for (long checkpointTxId : txids) {
+ File image = new File(nameDir,
+ NNStorage.getImageFileName(checkpointTxId));
+ assertTrue("Expected non-empty " + image, image.length() > 0);
+ }
+ }
+ }
+
+ static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster) {
+ List<File> nameDirs = Lists.newArrayList();
+ for (URI u : cluster.getNameDirs(0)) {
+ nameDirs.add(new File(u.getPath(), "current"));
+ }
+ return nameDirs;
+ }
+
+ /**
+ * @return the latest edits log, finalized or otherwise, from the given
+ * storage directory.
+ */
+ public static FoundEditLog findLatestEditsLog(StorageDirectory sd)
+ throws IOException {
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+ inspector.inspectDirectory(sd);
+
+ List<FoundEditLog> foundEditLogs = Lists.newArrayList(
+ inspector.getFoundEditLogs());
+ return Collections.max(foundEditLogs, new Comparator<FoundEditLog>() {
+ @Override
+ public int compare(FoundEditLog a, FoundEditLog b) {
+ return ComparisonChain.start()
+ .compare(a.getStartTxId(), b.getStartTxId())
+ .compare(a.getLastTxId(), b.getLastTxId())
+ .result();
+ }
+ });
+ }
+
+ /**
+ * Corrupt the given VERSION file by replacing a given
+ * key with a new value and re-writing the file.
+ *
+ * @param versionFile the VERSION file to corrupt
+ * @param key the key to replace
+ * @param value the new value for this key
+ */
+ public static void corruptVersionFile(File versionFile, String key, String value)
+ throws IOException {
+ Properties props = new Properties();
+ FileInputStream fis = new FileInputStream(versionFile);
+ FileOutputStream out = null;
+ try {
+ props.load(fis);
+ IOUtils.closeStream(fis);
+
+ props.setProperty(key, value);
+
+ out = new FileOutputStream(versionFile);
+ props.store(out, null);
+
+ } finally {
+ IOUtils.cleanup(null, fis, out);
+ }
+ }
+
+ public static void assertReasonableNameCurrentDir(File curDir)
+ throws IOException {
+ assertTrue(curDir.isDirectory());
+ assertTrue(new File(curDir, "VERSION").isFile());
+ assertTrue(new File(curDir, "seen_txid").isFile());
+ File image = findNewestImageFile(curDir.toString());
+ assertNotNull(image);
+ }
+
+
+}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Fri Jul 29 16:28:45 2011
@@ -69,8 +69,8 @@ public class OfflineEditsViewerHelper {
* @param editsFilename where to copy the edits
*/
public String generateEdits() throws IOException {
- runOperations();
- return getEditsFilename();
+ CheckpointSignature signature = runOperations();
+ return getEditsFilename(signature);
}
/**
@@ -78,13 +78,16 @@ public class OfflineEditsViewerHelper {
*
* @return edits file name for cluster
*/
- private String getEditsFilename() throws IOException {
+ private String getEditsFilename(CheckpointSignature sig) throws IOException {
FSImage image = cluster.getNameNode().getFSImage();
// it was set up to only have ONE StorageDirectory
Iterator<StorageDirectory> it
= image.getStorage().dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = it.next();
- return image.getStorage().getEditFile(sd).getAbsolutePath();
+ File ret = NNStorage.getFinalizedEditsFile(
+ sd, 1, sig.curSegmentTxId - 1);
+ assert ret.exists() : "expected " + ret + " exists";
+ return ret.getAbsolutePath();
}
/**
@@ -131,7 +134,7 @@ public class OfflineEditsViewerHelper {
* OP_SET_NS_QUOTA (11)
* OP_CLEAR_NS_QUOTA (12)
*/
- private void runOperations() throws IOException {
+ private CheckpointSignature runOperations() throws IOException {
LOG.info("Creating edits by performing fs operations");
// no check, if it's not it throws an exception which is what we want
@@ -238,5 +241,8 @@ public class OfflineEditsViewerHelper {
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNode(), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
+
+ // Force a roll so we get an OP_END_LOG_SEGMENT txn
+ return cluster.getNameNode().rollEditLog();
}
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java Fri Jul 29 16:28:45 2011
@@ -19,9 +19,12 @@ package org.apache.hadoop.hdfs.server.na
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -29,14 +32,28 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import junit.framework.TestCase;
public class TestBackupNode extends TestCase {
public static final Log LOG = LogFactory.getLog(TestBackupNode.class);
+
+ static {
+ ((Log4JLogger)Checkpointer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)BackupImage.LOG).getLogger().setLevel(Level.ALL);
+ }
+
static final String BASE_DIR = MiniDFSCluster.getBaseDirectory();
protected void setUp() throws Exception {
@@ -53,73 +70,185 @@ public class TestBackupNode extends Test
dirB.mkdirs();
}
- protected void tearDown() throws Exception {
- super.tearDown();
- File baseDir = new File(BASE_DIR);
- if(!(FileUtil.fullyDelete(baseDir)))
- throw new IOException("Cannot remove directory: " + baseDir);
- }
-
- static void writeFile(FileSystem fileSys, Path name, int repl)
- throws IOException {
- TestCheckpoint.writeFile(fileSys, name, repl);
- }
-
-
- static void checkFile(FileSystem fileSys, Path name, int repl)
- throws IOException {
- TestCheckpoint.checkFile(fileSys, name, repl);
- }
-
- void cleanupFile(FileSystem fileSys, Path name)
- throws IOException {
- TestCheckpoint.cleanupFile(fileSys, name);
- }
-
- static String getBackupNodeDir(StartupOption t, int i) {
- return BASE_DIR + "name" + t.getName() + i + "/";
+ static String getBackupNodeDir(StartupOption t, int idx) {
+ return BASE_DIR + "name" + t.getName() + idx + "/";
}
BackupNode startBackupNode(Configuration conf,
- StartupOption t, int i) throws IOException {
+ StartupOption startupOpt,
+ int idx) throws IOException {
Configuration c = new HdfsConfiguration(conf);
- String dirs = getBackupNodeDir(t, i);
+ String dirs = getBackupNodeDir(startupOpt, idx);
c.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, dirs);
c.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"${" + DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY + "}");
- return (BackupNode)NameNode.createNameNode(new String[]{t.getName()}, c);
+ c.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY,
+ "127.0.0.1:0");
+
+ return (BackupNode)NameNode.createNameNode(new String[]{startupOpt.getName()}, c);
}
- void waitCheckpointDone(BackupNode backup) {
+ void waitCheckpointDone(
+ MiniDFSCluster cluster, BackupNode backup, long txid) {
+ long thisCheckpointTxId;
do {
try {
- LOG.info("Waiting checkpoint to complete...");
+ LOG.info("Waiting checkpoint to complete... " +
+ "checkpoint txid should increase above " + txid);
Thread.sleep(1000);
} catch (Exception e) {}
- } while(backup.getCheckpointState() != CheckpointStates.START);
+ thisCheckpointTxId = backup.getFSImage().getStorage()
+ .getMostRecentCheckpointTxId();
+
+ } while (thisCheckpointTxId < txid);
+
+ // Check that the checkpoint got uploaded to NN successfully
+ FSImageTestUtil.assertNNHasCheckpoints(cluster,
+ Collections.singletonList((int)thisCheckpointTxId));
}
- public void testCheckpoint() throws IOException {
+ public void testCheckpointNode() throws Exception {
testCheckpoint(StartupOption.CHECKPOINT);
- testCheckpoint(StartupOption.BACKUP);
}
+
+ /**
+ * Ensure that the backupnode will tail edits from the NN
+ * and keep in sync, even while the NN rolls, checkpoints
+ * occur, etc.
+ */
+ public void testBackupNodeTailsEdits() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+ BackupNode backup = null;
- void testCheckpoint(StartupOption op) throws IOException {
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0).build();
+ fileSys = cluster.getFileSystem();
+ backup = startBackupNode(conf, StartupOption.BACKUP, 1);
+
+ BackupImage bnImage = backup.getBNImage();
+ testBNInSync(cluster, backup, 1);
+
+ // Force a roll -- BN should roll with NN.
+ NameNode nn = cluster.getNameNode();
+ nn.rollEditLog();
+ assertEquals(bnImage.getEditLog().getCurSegmentTxId(),
+ nn.getFSImage().getEditLog().getCurSegmentTxId());
+
+ // BN should stay in sync after roll
+ testBNInSync(cluster, backup, 2);
+
+ long nnImageBefore =
+ nn.getFSImage().getStorage().getMostRecentCheckpointTxId();
+ // BN checkpoint
+ backup.doCheckpoint();
+
+ // NN should have received a new image
+ long nnImageAfter =
+ nn.getFSImage().getStorage().getMostRecentCheckpointTxId();
+
+ assertTrue("nn should have received new checkpoint. before: " +
+ nnImageBefore + " after: " + nnImageAfter,
+ nnImageAfter > nnImageBefore);
+
+ // BN should stay in sync after checkpoint
+ testBNInSync(cluster, backup, 3);
+
+ // Stop BN
+ StorageDirectory sd = bnImage.getStorage().getStorageDir(0);
+ backup.stop();
+ backup = null;
+
+ // When shutting down the BN, it shouldn't finalize logs that are
+ // still open on the NN
+ FoundEditLog editsLog = FSImageTestUtil.findLatestEditsLog(sd);
+ assertEquals(editsLog.getStartTxId(),
+ nn.getFSImage().getEditLog().getCurSegmentTxId());
+ assertTrue("Should not have finalized " + editsLog,
+ editsLog.isInProgress());
+
+ // do some edits
+ assertTrue(fileSys.mkdirs(new Path("/edit-while-bn-down")));
+
+ // start a new backup node
+ backup = startBackupNode(conf, StartupOption.BACKUP, 1);
+
+ testBNInSync(cluster, backup, 4);
+ assertNotNull(backup.getNamesystem().getFileInfo("/edit-while-bn-down", false));
+ } finally {
+ LOG.info("Shutting down...");
+ if (backup != null) backup.stop();
+ if (fileSys != null) fileSys.close();
+ if (cluster != null) cluster.shutdown();
+ }
+
+ assertStorageDirsMatch(cluster.getNameNode(), backup);
+ }
+
+ private void testBNInSync(MiniDFSCluster cluster, final BackupNode backup,
+ int testIdx) throws Exception {
+
+ final NameNode nn = cluster.getNameNode();
+ final FileSystem fs = cluster.getFileSystem();
+
+ // Do a bunch of namespace operations, make sure they're replicated
+ // to the BN.
+ for (int i = 0; i < 10; i++) {
+ final String src = "/test_" + testIdx + "_" + i;
+ LOG.info("Creating " + src + " on NN");
+ Path p = new Path(src);
+ assertTrue(fs.mkdirs(p));
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("Checking for " + src + " on BN");
+ try {
+ boolean hasFile = backup.getNamesystem().getFileInfo(src, false) != null;
+ boolean txnIdMatch = backup.getTransactionID() == nn.getTransactionID();
+ return hasFile && txnIdMatch;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 30, 10000);
+ }
+
+ assertStorageDirsMatch(nn, backup);
+ }
+
+ private void assertStorageDirsMatch(final NameNode nn, final BackupNode backup)
+ throws Exception {
+ // Check that the stored files in the name dirs are identical
+ List<File> dirs = Lists.newArrayList(
+ FSImageTestUtil.getCurrentDirs(nn.getFSImage().getStorage(),
+ null));
+ dirs.addAll(FSImageTestUtil.getCurrentDirs(backup.getFSImage().getStorage(),
+ null));
+ FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of("VERSION"));
+ }
+
+ public void testBackupNode() throws Exception {
+ testCheckpoint(StartupOption.BACKUP);
+ }
+
+ void testCheckpoint(StartupOption op) throws Exception {
Path file1 = new Path("checkpoint.dat");
Path file2 = new Path("checkpoint2.dat");
Configuration conf = new HdfsConfiguration();
- short replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
conf.set(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "0");
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // disable block scanner
- int numDatanodes = Math.max(3, replication);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1);
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
BackupNode backup = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numDatanodes).build();
+ .numDataNodes(0).build();
fileSys = cluster.getFileSystem();
//
// verify that 'format' really blew away all pre-existing files
@@ -130,14 +259,14 @@ public class TestBackupNode extends Test
//
// Create file1
//
- writeFile(fileSys, file1, replication);
- checkFile(fileSys, file1, replication);
+ assertTrue(fileSys.mkdirs(file1));
//
// Take a checkpoint
//
+ long txid = cluster.getNameNode().getTransactionID();
backup = startBackupNode(conf, op, 1);
- waitCheckpointDone(backup);
+ waitCheckpointDone(cluster, backup, txid);
} catch(IOException e) {
LOG.error("Error in TestBackupNode:", e);
assertTrue(e.getLocalizedMessage(), false);
@@ -146,32 +275,46 @@ public class TestBackupNode extends Test
if(fileSys != null) fileSys.close();
if(cluster != null) cluster.shutdown();
}
- File imageFileNN = new File(BASE_DIR, "name1/current/fsimage");
- File imageFileBN = new File(getBackupNodeDir(op, 1), "/current/fsimage");
- LOG.info("NameNode fsimage length = " + imageFileNN.length());
- LOG.info("Backup Node fsimage length = " + imageFileBN.length());
- assertTrue(imageFileNN.length() == imageFileBN.length());
+ File nnCurDir = new File(BASE_DIR, "name1/current/");
+ File bnCurDir = new File(getBackupNodeDir(op, 1), "/current/");
+ FSImageTestUtil.assertParallelFilesAreIdentical(
+ ImmutableList.of(bnCurDir, nnCurDir),
+ ImmutableSet.<String>of("VERSION"));
+
try {
//
// Restart cluster and verify that file1 still exist.
//
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build();
fileSys = cluster.getFileSystem();
// check that file1 still exists
- checkFile(fileSys, file1, replication);
- cleanupFile(fileSys, file1);
+ assertTrue(fileSys.exists(file1));
+ fileSys.delete(file1, true);
// create new file file2
- writeFile(fileSys, file2, replication);
- checkFile(fileSys, file2, replication);
+ fileSys.mkdirs(file2);
//
// Take a checkpoint
//
backup = startBackupNode(conf, op, 1);
- waitCheckpointDone(backup);
+ long txid = cluster.getNameNode().getTransactionID();
+ waitCheckpointDone(cluster, backup, txid);
+
+ for (int i = 0; i < 10; i++) {
+ fileSys.mkdirs(new Path("file_" + i));
+ }
+
+ txid = cluster.getNameNode().getTransactionID();
+ backup.doCheckpoint();
+ waitCheckpointDone(cluster, backup, txid);
+
+ txid = cluster.getNameNode().getTransactionID();
+ backup.doCheckpoint();
+ waitCheckpointDone(cluster, backup, txid);
+
} catch(IOException e) {
LOG.error("Error in TestBackupNode:", e);
assertTrue(e.getLocalizedMessage(), false);
@@ -180,22 +323,22 @@ public class TestBackupNode extends Test
if(fileSys != null) fileSys.close();
if(cluster != null) cluster.shutdown();
}
- LOG.info("NameNode fsimage length = " + imageFileNN.length());
- LOG.info("Backup Node fsimage length = " + imageFileBN.length());
- assertTrue(imageFileNN.length() == imageFileBN.length());
+ FSImageTestUtil.assertParallelFilesAreIdentical(
+ ImmutableList.of(bnCurDir, nnCurDir),
+ ImmutableSet.<String>of("VERSION"));
try {
//
// Restart cluster and verify that file2 exists and
// file1 does not exist.
//
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
fileSys = cluster.getFileSystem();
assertTrue(!fileSys.exists(file1));
// verify that file2 exists
- checkFile(fileSys, file2, replication);
+ assertTrue(fileSys.exists(file2));
} catch(IOException e) {
LOG.error("Error in TestBackupNode:", e);
assertTrue(e.getLocalizedMessage(), false);
@@ -204,52 +347,4 @@ public class TestBackupNode extends Test
cluster.shutdown();
}
}
-
- /**
- * Test that only one backup node can register.
- * @throws IOException
- */
- public void testBackupRegistration() throws IOException {
- Configuration conf1 = new HdfsConfiguration();
- Configuration conf2 = null;
- MiniDFSCluster cluster = null;
- BackupNode backup1 = null;
- BackupNode backup2 = null;
- try {
- // start name-node and backup node 1
- cluster = new MiniDFSCluster.Builder(conf1).numDataNodes(0).build();
- conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
- conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7775");
- backup1 = startBackupNode(conf1, StartupOption.BACKUP, 1);
- // try to start backup node 2
- conf2 = new HdfsConfiguration(conf1);
- conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7772");
- conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7776");
- try {
- backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);
- backup2.stop();
- backup2 = null;
- assertTrue("Only one backup node should be able to start", false);
- } catch(IOException e) {
- assertTrue(
- e.getLocalizedMessage().contains("Registration is not allowed"));
- // should fail - doing good
- }
- // stop backup node 1; backup node 2 should be able to start
- backup1.stop();
- backup1 = null;
- try {
- backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);
- } catch(IOException e) {
- assertTrue("Backup node 2 should be able to start", false);
- }
- } catch(IOException e) {
- LOG.error("Error in TestBackupNode:", e);
- assertTrue(e.getLocalizedMessage(), false);
- } finally {
- if(backup1 != null) backup1.stop();
- if(backup2 != null) backup2.stop();
- if(cluster != null) cluster.shutdown();
- }
- }
}