You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/06/12 05:54:12 UTC
svn commit: r1134860 - in /hadoop/hdfs/branches/branch-0.22: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: eli
Date: Sun Jun 12 03:54:12 2011
New Revision: 1134860
URL: http://svn.apache.org/viewvc?rev=1134860&view=rev
Log:
HDFS-988. saveNamespace race can corrupt the edits log. Contributed by Eli Collins
Modified:
hadoop/hdfs/branches/branch-0.22/CHANGES.txt
hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
Modified: hadoop/hdfs/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/CHANGES.txt?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.22/CHANGES.txt Sun Jun 12 03:54:12 2011
@@ -576,6 +576,8 @@ Release 0.22.0 - Unreleased
HDFS-2039. TestNameNodeMetrics uses a bad test root path, preventing it
from running inside Eclipse. (todd)
+ HDFS-988. saveNamespace race can corrupt the edits log. (eli)
+
Release 0.21.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sun Jun 12 03:54:12 2011
@@ -1864,15 +1864,10 @@ class FSDirectory implements Closeable {
void setQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, QuotaExceededException,
UnresolvedLinkException {
- writeLock();
- try {
- INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
- if (dir != null) {
- fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(),
- dir.getDsQuota());
- }
- } finally {
- writeUnlock();
+ INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
+ if (dir != null) {
+ fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(),
+ dir.getDsQuota());
}
}
Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Jun 12 03:54:12 2011
@@ -522,9 +522,13 @@ public class FSNamesystem implements FSC
}
NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo(dir.fsImage.getNamespaceID(),
- dir.fsImage.getCTime(),
- getDistributedUpgradeVersion());
+ readLock();
+ try {
+ return new NamespaceInfo(dir.fsImage.getNamespaceID(),
+ dir.fsImage.getCTime(), getDistributedUpgradeVersion());
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -1683,6 +1687,11 @@ public class FSNamesystem implements FSC
UnresolvedLinkException, IOException {
writeLock();
try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot addbandon block " + b +
+ " for file "+src, safeMode);
+ }
+
//
// Remove the block from the pending creates list
//
@@ -2184,12 +2193,17 @@ public class FSNamesystem implements FSC
*/
void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set quota on " + path, safeMode);
- if (isPermissionEnabled) {
- checkSuperuserPrivilege();
+ writeLock();
+ try {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot set quota on " + path, safeMode);
+ if (isPermissionEnabled) {
+ checkSuperuserPrivilege();
+ }
+ dir.setQuota(path, nsQuota, dsQuota);
+ } finally {
+ writeUnlock();
}
- dir.setQuota(path, nsQuota, dsQuota);
getEditLog().logSync();
}
@@ -2213,6 +2227,7 @@ public class FSNamesystem implements FSC
} finally {
writeUnlock();
}
+ getEditLog().logSync();
}
/**
@@ -2392,6 +2407,10 @@ public class FSNamesystem implements FSC
+ ", closeFile=" + closeFile
+ ", deleteBlock=" + deleteblock
+ ")");
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot commitBlockSynchronization "
+ + lastblock, safeMode);
+ }
final BlockInfo storedBlock = blockManager.getStoredBlock(lastblock);
if (storedBlock == null) {
throw new IOException("Block (=" + lastblock + ") not found");
@@ -2477,9 +2496,14 @@ public class FSNamesystem implements FSC
* Renew the lease(s) held by the given client
*/
void renewLease(String holder) throws IOException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
- leaseManager.renewLease(holder);
+ writeLock();
+ try {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+ leaseManager.renewLease(holder);
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -4241,6 +4265,7 @@ public class FSNamesystem implements FSC
return;
}
safeMode.setManual();
+ getEditLog().logSyncAll();
NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
+ safeMode.getTurnOffTip());
} finally {
@@ -4587,9 +4612,14 @@ public class FSNamesystem implements FSC
/**
* Increments, logs and then returns the stamp
*/
- long nextGenerationStamp() {
+ private long nextGenerationStamp() throws IOException {
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot get next generation stamp", safeMode);
+ }
long gs = generationStamp.nextStamp();
getEditLog().logGenerationStamp(gs);
+ // NB: callers sync the log
return gs;
}
@@ -4639,6 +4669,7 @@ public class FSNamesystem implements FSC
*/
LocatedBlock updateBlockForPipeline(Block block,
String clientName) throws IOException {
+ LocatedBlock locatedBlock;
writeLock();
try {
// check vadility of parameters
@@ -4646,15 +4677,16 @@ public class FSNamesystem implements FSC
// get a new generation stamp and an access token
block.setGenerationStamp(nextGenerationStamp());
- LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+ locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
if (isBlockTokenEnabled) {
locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
- return locatedBlock;
} finally {
writeUnlock();
}
+ getEditLog().logSync();
+ return locatedBlock;
}
Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Sun Jun 12 03:54:12 2011
@@ -26,7 +26,6 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
@@ -57,12 +56,10 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
-import static org.junit.Assert.*;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -508,6 +505,15 @@ public class DFSTestUtil {
IOUtils.copyBytes(is, os, s.length(), true);
}
+ /* Append the given string to the given file */
+ public static void appendFile(FileSystem fs, Path p, String s)
+ throws IOException {
+ assert fs.exists(p);
+ InputStream is = new ByteArrayInputStream(s.getBytes());
+ FSDataOutputStream os = fs.append(p);
+ IOUtils.copyBytes(is, os, s.length(), true);
+ }
+
// Returns url content as string.
public static String urlGet(URL url) throws IOException {
URLConnection conn = url.openConnection();
Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java Sun Jun 12 03:54:12 2011
@@ -20,20 +20,44 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
/**
* Tests to verify safe mode correctness.
*/
-public class TestSafeMode extends TestCase {
-
- static Log LOG = LogFactory.getLog(TestSafeMode.class);
+public class TestSafeMode {
+ Configuration conf;
+ MiniDFSCluster cluster;
+ FileSystem fs;
+ DistributedFileSystem dfs;
+
+ @Before
+ public void startUp() throws IOException {
+ conf = new HdfsConfiguration();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ dfs = (DistributedFileSystem)fs;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
/**
* This test verifies that if SafeMode is manually entered, name-node does not
@@ -51,61 +75,123 @@ public class TestSafeMode extends TestCa
*
* @throws IOException
*/
- public void testManualSafeMode() throws IOException {
- MiniDFSCluster cluster = null;
- DistributedFileSystem fs = null;
+ @Test
+ public void testManualSafeMode() throws IOException {
+ fs = (DistributedFileSystem)cluster.getFileSystem();
+ Path file1 = new Path("/tmp/testManualSafeMode/file1");
+ Path file2 = new Path("/tmp/testManualSafeMode/file2");
+
+ // create two files with one block each.
+ DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
+ DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
+ fs.close();
+ cluster.shutdown();
+
+ // now bring up just the NameNode.
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
+ cluster.waitActive();
+ dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+ assertTrue("No datanode is started. Should be in SafeMode",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+
+ // manually set safemode.
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+
+ // now bring up the datanode and wait for it to be active.
+ cluster.startDataNodes(conf, 1, true, null, null);
+ cluster.waitActive();
+
+ // wait longer than dfs.namenode.safemode.extension
try {
- Configuration conf = new HdfsConfiguration();
- // disable safemode extension to make the test run faster.
- conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
- cluster = new MiniDFSCluster.Builder(conf).build();
- cluster.waitActive();
-
- fs = (DistributedFileSystem)cluster.getFileSystem();
- Path file1 = new Path("/tmp/testManualSafeMode/file1");
- Path file2 = new Path("/tmp/testManualSafeMode/file2");
-
- LOG.info("Created file1 and file2.");
-
- // create two files with one block each.
- DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
- DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
- fs.close();
- cluster.shutdown();
-
- // now bring up just the NameNode.
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
- cluster.waitActive();
- fs = (DistributedFileSystem)cluster.getFileSystem();
-
- LOG.info("Restarted cluster with just the NameNode");
-
- assertTrue("No datanode is started. Should be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- // manually set safemode.
- fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-
- // now bring up the datanode and wait for it to be active.
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive();
-
- LOG.info("Datanode is started.");
-
- // wait longer than dfs.safemode.extension
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {}
-
- assertTrue("should still be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
- assertFalse("should not be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
- } finally {
- if(fs != null) fs.close();
- if(cluster!= null) cluster.shutdown();
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {}
+
+ assertTrue("should still be in SafeMode",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+ assertFalse("should not be in SafeMode",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
+ }
+
+ public interface FSRun {
+ public abstract void run(FileSystem fs) throws IOException;
+ }
+
+ /**
+ * Assert that the given function fails to run due to a safe
+ * mode exception.
+ */
+ public void runFsFun(String msg, FSRun f) {
+ try {
+ f.run(fs);
+ fail(msg);
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("safe mode"));
+ }
+ }
+
+ /**
+ * Run various fs operations while the NN is in safe mode,
+ * assert that they are either allowed or fail as expected.
+ */
+ @Test
+ public void testOperationsWhileInSafeMode() throws IOException {
+ final Path file1 = new Path("/file1");
+
+ assertFalse(dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+ DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+ assertTrue("Could not enter SM",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER));
+
+ runFsFun("Set quota while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ ((DistributedFileSystem)fs).setQuota(file1, 1, 1);
+ }});
+
+ runFsFun("Set perm while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.setPermission(file1, FsPermission.getDefault());
+ }});
+
+ runFsFun("Set owner while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.setOwner(file1, "user", "group");
+ }});
+
+ runFsFun("Set repl while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.setReplication(file1, (short)1);
+ }});
+
+ runFsFun("Append file while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ DFSTestUtil.appendFile(fs, file1, "new bytes");
+ }});
+
+ runFsFun("Delete file while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.delete(file1, false);
+ }});
+
+ runFsFun("Rename file while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.rename(file1, new Path("file2"));
+ }});
+
+ try {
+ fs.setTimes(file1, 0, 0);
+ } catch (IOException ioe) {
+ fail("Set times failed while in SM");
}
+
+ try {
+ DFSTestUtil.readFile(fs, file1);
+ } catch (IOException ioe) {
+ fail("Set times failed while in SM");
+ }
+
+ assertFalse("Could not leave SM",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
}
-}
+
+}
\ No newline at end of file
Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java Sun Jun 12 03:54:12 2011
@@ -20,109 +20,31 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import junit.framework.TestCase;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
/**
* Tests to verify safe mode correctness.
*/
-public class TestSafeMode extends TestCase {
+public class TestSafeMode {
- static Log LOG = LogFactory.getLog(TestSafeMode.class);
-
- /**
- * This test verifies that if SafeMode is manually entered, name-node does not
- * come out of safe mode even after the startup safe mode conditions are met.
- * <ol>
- * <li>Start cluster with 1 data-node.</li>
- * <li>Create 2 files with replication 1.</li>
- * <li>Re-start cluster with 0 data-nodes.
- * Name-node should stay in automatic safe-mode.</li>
- * <li>Enter safe mode manually.</li>
- * <li>Start the data-node.</li>
- * <li>Wait longer than <tt>dfs.safemode.extension</tt> and
- * verify that the name-node is still in safe mode.</li>
- * </ol>
- *
- * @throws IOException
- */
- public void testManualSafeMode() throws IOException {
- MiniDFSCluster cluster = null;
- DistributedFileSystem fs = null;
- try {
- Configuration conf = new HdfsConfiguration();
- // disable safemode extension to make the test run faster.
- conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
- cluster = new MiniDFSCluster.Builder(conf).build();
- cluster.waitActive();
-
- fs = (DistributedFileSystem)cluster.getFileSystem();
- Path file1 = new Path("/tmp/testManualSafeMode/file1");
- Path file2 = new Path("/tmp/testManualSafeMode/file2");
-
- LOG.info("Created file1 and file2.");
-
- // create two files with one block each.
- DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
- DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
- fs.close();
- cluster.shutdown();
-
- // now bring up just the NameNode.
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
- cluster.waitActive();
- fs = (DistributedFileSystem)cluster.getFileSystem();
-
- LOG.info("Restarted cluster with just the NameNode");
-
- assertTrue("No datanode is started. Should be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- // manually set safemode.
- fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-
- // now bring up the datanode and wait for it to be active.
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive();
-
- LOG.info("Datanode is started.");
-
- // wait longer than dfs.safemode.extension
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {}
-
- assertTrue("should still be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
- assertFalse("should not be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
- } finally {
- if(fs != null) fs.close();
- if(cluster!= null) cluster.shutdown();
- }
- }
-
-
/**
* Verify that the NameNode stays in safemode when dfs.safemode.datanode.min
* is set to a number greater than the number of live datanodes.
*/
+ @Test
public void testDatanodeThreshold() throws IOException {
MiniDFSCluster cluster = null;
DistributedFileSystem fs = null;
try {
- Configuration conf = new Configuration();
+ Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);