You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/06/12 05:54:12 UTC

svn commit: r1134860 - in /hadoop/hdfs/branches/branch-0.22: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: eli
Date: Sun Jun 12 03:54:12 2011
New Revision: 1134860

URL: http://svn.apache.org/viewvc?rev=1134860&view=rev
Log:
HDFS-988. saveNamespace race can corrupt the edits log. Contributed by Eli Collins

Modified:
    hadoop/hdfs/branches/branch-0.22/CHANGES.txt
    hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
    hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java

Modified: hadoop/hdfs/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/CHANGES.txt?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.22/CHANGES.txt Sun Jun 12 03:54:12 2011
@@ -576,6 +576,8 @@ Release 0.22.0 - Unreleased
     HDFS-2039. TestNameNodeMetrics uses a bad test root path, preventing it
     from running inside Eclipse. (todd)
 
+    HDFS-988. saveNamespace race can corrupt the edits log. (eli)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sun Jun 12 03:54:12 2011
@@ -1864,15 +1864,10 @@ class FSDirectory implements Closeable {
   void setQuota(String src, long nsQuota, long dsQuota) 
     throws FileNotFoundException, QuotaExceededException,
     UnresolvedLinkException { 
-    writeLock();
-    try {
-      INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
-      if (dir != null) {
-        fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
-                                         dir.getDsQuota());
-      }
-    } finally {
-      writeUnlock();
+    INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
+    if (dir != null) {
+      fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(),
+                                       dir.getDsQuota());
     }
   }
   

Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Jun 12 03:54:12 2011
@@ -522,9 +522,13 @@ public class FSNamesystem implements FSC
   }
   
   NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getNamespaceID(),
-                             dir.fsImage.getCTime(),
-                             getDistributedUpgradeVersion());
+    readLock();
+    try {
+      return new NamespaceInfo(dir.fsImage.getNamespaceID(),
+          dir.fsImage.getCTime(), getDistributedUpgradeVersion());
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -1683,6 +1687,11 @@ public class FSNamesystem implements FSC
       UnresolvedLinkException, IOException {
     writeLock();
     try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot addbandon block " + b +
+            " for file "+src, safeMode);
+      }
+
     //
     // Remove the block from the pending creates list
     //
@@ -2184,12 +2193,17 @@ public class FSNamesystem implements FSC
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set quota on " + path, safeMode);
-    if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
+    writeLock();
+    try {
+      if (isInSafeMode())
+        throw new SafeModeException("Cannot set quota on " + path, safeMode);
+      if (isPermissionEnabled) {
+        checkSuperuserPrivilege();
+      }
+      dir.setQuota(path, nsQuota, dsQuota);
+    } finally {
+      writeUnlock();
     }
-    dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
   
@@ -2213,6 +2227,7 @@ public class FSNamesystem implements FSC
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
   }
 
   /**
@@ -2392,6 +2407,10 @@ public class FSNamesystem implements FSC
           + ", closeFile=" + closeFile
           + ", deleteBlock=" + deleteblock
           + ")");
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot commitBlockSynchronization "
+          + lastblock, safeMode);
+    }
     final BlockInfo storedBlock = blockManager.getStoredBlock(lastblock);
     if (storedBlock == null) {
       throw new IOException("Block (=" + lastblock + ") not found");
@@ -2477,9 +2496,14 @@ public class FSNamesystem implements FSC
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-    leaseManager.renewLease(holder);
+    writeLock();
+    try {
+      if (isInSafeMode())
+        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+      leaseManager.renewLease(holder);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -4241,6 +4265,7 @@ public class FSNamesystem implements FSC
       return;
     }
     safeMode.setManual();
+    getEditLog().logSyncAll();
     NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
                                 + safeMode.getTurnOffTip());
     } finally {
@@ -4587,9 +4612,14 @@ public class FSNamesystem implements FSC
   /**
    * Increments, logs and then returns the stamp
    */
-  long nextGenerationStamp() {
+  private long nextGenerationStamp() throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException(
+          "Cannot get next generation stamp", safeMode);
+    }
     long gs = generationStamp.nextStamp();
     getEditLog().logGenerationStamp(gs);
+    // NB: callers sync the log
     return gs;
   }
 
@@ -4639,6 +4669,7 @@ public class FSNamesystem implements FSC
    */
   LocatedBlock updateBlockForPipeline(Block block, 
       String clientName) throws IOException {
+    LocatedBlock locatedBlock;
     writeLock();
     try {
     // check vadility of parameters
@@ -4646,15 +4677,16 @@ public class FSNamesystem implements FSC
     
     // get a new generation stamp and an access token
     block.setGenerationStamp(nextGenerationStamp());
-    LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+    locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
     if (isBlockTokenEnabled) {
       locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
           block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
-    return locatedBlock;
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
+    return locatedBlock;
   }
   
   

Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Sun Jun 12 03:54:12 2011
@@ -26,7 +26,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.URL;
 import java.net.URLConnection;
 import java.security.PrivilegedExceptionAction;
@@ -57,12 +56,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 
-import static org.junit.Assert.*;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -508,6 +505,15 @@ public class DFSTestUtil {
     IOUtils.copyBytes(is, os, s.length(), true);
   }
 
+  /* Append the given string to the given file */
+  public static void appendFile(FileSystem fs, Path p, String s) 
+      throws IOException {
+    assert fs.exists(p);
+    InputStream is = new ByteArrayInputStream(s.getBytes());
+    FSDataOutputStream os = fs.append(p);
+    IOUtils.copyBytes(is, os, s.length(), true);
+  }
+
   // Returns url content as string.
   public static String urlGet(URL url) throws IOException {
     URLConnection conn = url.openConnection();

Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java Sun Jun 12 03:54:12 2011
@@ -20,20 +20,44 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
 
 /**
  * Tests to verify safe mode correctness.
  */
-public class TestSafeMode extends TestCase {
-  
-  static Log LOG = LogFactory.getLog(TestSafeMode.class);
+public class TestSafeMode {
+  Configuration conf; 
+  MiniDFSCluster cluster;
+  FileSystem fs;
+  DistributedFileSystem dfs;
+
+  @Before
+  public void startUp() throws IOException {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();      
+    fs = cluster.getFileSystem();
+    dfs = (DistributedFileSystem)fs;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
 
   /**
    * This test verifies that if SafeMode is manually entered, name-node does not
@@ -51,61 +75,123 @@ public class TestSafeMode extends TestCa
    *  
    * @throws IOException
    */
-  public void testManualSafeMode() throws IOException {
-    MiniDFSCluster cluster = null;
-    DistributedFileSystem fs = null;
+  @Test
+  public void testManualSafeMode() throws IOException {      
+    fs = (DistributedFileSystem)cluster.getFileSystem();
+    Path file1 = new Path("/tmp/testManualSafeMode/file1");
+    Path file2 = new Path("/tmp/testManualSafeMode/file2");
+    
+    // create two files with one block each.
+    DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
+    DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
+    fs.close();
+    cluster.shutdown();
+    
+    // now bring up just the NameNode.
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
+    cluster.waitActive();
+    dfs = (DistributedFileSystem)cluster.getFileSystem();
+    
+    assertTrue("No datanode is started. Should be in SafeMode", 
+               dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    
+    // manually set safemode.
+    dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    
+    // now bring up the datanode and wait for it to be active.
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.waitActive();
+    
+    // wait longer than dfs.namenode.safemode.extension
     try {
-      Configuration conf = new HdfsConfiguration();
-      // disable safemode extension to make the test run faster.
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-      
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      Path file1 = new Path("/tmp/testManualSafeMode/file1");
-      Path file2 = new Path("/tmp/testManualSafeMode/file2");
-      
-      LOG.info("Created file1 and file2.");
-      
-      // create two files with one block each.
-      DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
-      DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
-      fs.close();
-      cluster.shutdown();
-      
-      // now bring up just the NameNode.
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
-      cluster.waitActive();
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      
-      LOG.info("Restarted cluster with just the NameNode");
-      
-      assertTrue("No datanode is started. Should be in SafeMode", 
-                 fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      // manually set safemode.
-      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      
-      // now bring up the datanode and wait for it to be active.
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();
-      
-      LOG.info("Datanode is started.");
-
-      // wait longer than dfs.safemode.extension
-      try {
-        Thread.sleep(2000);
-      } catch (InterruptedException ignored) {}
-      
-      assertTrue("should still be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-      assertFalse("should not be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-    } finally {
-      if(fs != null) fs.close();
-      if(cluster!= null) cluster.shutdown();
+      Thread.sleep(2000);
+    } catch (InterruptedException ignored) {}
+
+    assertTrue("should still be in SafeMode",
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    assertFalse("should not be in SafeMode", 
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
+  }
+
+  public interface FSRun {
+    public abstract void run(FileSystem fs) throws IOException;
+  }
+
+  /**
+   * Assert that the given function fails to run due to a safe 
+   * mode exception.
+   */
+  public void runFsFun(String msg, FSRun f) {
+    try {
+      f.run(fs);
+      fail(msg);
+     } catch (IOException ioe) {
+       assertTrue(ioe.getMessage().contains("safe mode"));
+     }
+  }
+
+  /**
+   * Run various fs operations while the NN is in safe mode,
+   * assert that they are either allowed or fail as expected.
+   */
+  @Test
+  public void testOperationsWhileInSafeMode() throws IOException {
+    final Path file1 = new Path("/file1");
+
+    assertFalse(dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+    assertTrue("Could not enter SM", 
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER));
+
+    runFsFun("Set quota while in SM", new FSRun() { 
+      public void run(FileSystem fs) throws IOException {
+        ((DistributedFileSystem)fs).setQuota(file1, 1, 1); 
+      }});
+
+    runFsFun("Set perm while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.setPermission(file1, FsPermission.getDefault());
+      }});
+
+    runFsFun("Set owner while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.setOwner(file1, "user", "group");
+      }});
+
+    runFsFun("Set repl while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.setReplication(file1, (short)1);
+      }});
+
+    runFsFun("Append file while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        DFSTestUtil.appendFile(fs, file1, "new bytes");
+      }});
+
+    runFsFun("Delete file while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.delete(file1, false);
+      }});
+
+    runFsFun("Rename file while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.rename(file1, new Path("file2"));
+      }});
+
+    try {
+      fs.setTimes(file1, 0, 0);
+    } catch (IOException ioe) {
+      fail("Set times failed while in SM");
     }
+
+    try {
+      DFSTestUtil.readFile(fs, file1);
+    } catch (IOException ioe) {
+      fail("Set times failed while in SM");
+    }
+
+    assertFalse("Could not leave SM",
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
   }
-}
+  
+}
\ No newline at end of file

Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java?rev=1134860&r1=1134859&r2=1134860&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java Sun Jun 12 03:54:12 2011
@@ -20,109 +20,31 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import junit.framework.TestCase;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 /**
  * Tests to verify safe mode correctness.
  */
-public class TestSafeMode extends TestCase {
+public class TestSafeMode {
   
-  static Log LOG = LogFactory.getLog(TestSafeMode.class);
-
-  /**
-   * This test verifies that if SafeMode is manually entered, name-node does not
-   * come out of safe mode even after the startup safe mode conditions are met.
-   * <ol>
-   * <li>Start cluster with 1 data-node.</li>
-   * <li>Create 2 files with replication 1.</li>
-   * <li>Re-start cluster with 0 data-nodes. 
-   * Name-node should stay in automatic safe-mode.</li>
-   * <li>Enter safe mode manually.</li>
-   * <li>Start the data-node.</li>
-   * <li>Wait longer than <tt>dfs.safemode.extension</tt> and 
-   * verify that the name-node is still in safe mode.</li>
-   * </ol>
-   *  
-   * @throws IOException
-   */
-  public void testManualSafeMode() throws IOException {
-    MiniDFSCluster cluster = null;
-    DistributedFileSystem fs = null;
-    try {
-      Configuration conf = new HdfsConfiguration();
-      // disable safemode extension to make the test run faster.
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-      
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      Path file1 = new Path("/tmp/testManualSafeMode/file1");
-      Path file2 = new Path("/tmp/testManualSafeMode/file2");
-      
-      LOG.info("Created file1 and file2.");
-      
-      // create two files with one block each.
-      DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
-      DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
-      fs.close();
-      cluster.shutdown();
-      
-      // now bring up just the NameNode.
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
-      cluster.waitActive();
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      
-      LOG.info("Restarted cluster with just the NameNode");
-      
-      assertTrue("No datanode is started. Should be in SafeMode", 
-                 fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      // manually set safemode.
-      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      
-      // now bring up the datanode and wait for it to be active.
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();
-      
-      LOG.info("Datanode is started.");
-
-      // wait longer than dfs.safemode.extension
-      try {
-        Thread.sleep(2000);
-      } catch (InterruptedException ignored) {}
-      
-      assertTrue("should still be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-      assertFalse("should not be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-    } finally {
-      if(fs != null) fs.close();
-      if(cluster!= null) cluster.shutdown();
-    }
-  }
-
-
   /**
    * Verify that the NameNode stays in safemode when dfs.safemode.datanode.min
    * is set to a number greater than the number of live datanodes.
    */
+  @Test
   public void testDatanodeThreshold() throws IOException {
     MiniDFSCluster cluster = null;
     DistributedFileSystem fs = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
       conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);