You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/14 22:11:41 UTC

svn commit: r1373061 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/serve...

Author: todd
Date: Tue Aug 14 20:11:40 2012
New Revision: 1373061

URL: http://svn.apache.org/viewvc?rev=1373061&view=rev
Log:
HDFS-3765. namenode -initializeSharedEdits should be able to initialize all shared storages. Contributed by Vinay and Todd Lipcon.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1373061&r1=1373060&r2=1373061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Aug 14 20:11:40 2012
@@ -383,6 +383,9 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd)
 
+    HDFS-3765. namenode -initializeSharedEdits should be able to initialize
+    all shared storages. (Vinay and todd via todd)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java?rev=1373061&r1=1373060&r2=1373061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java Tue Aug 14 20:11:40 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.contrib.bkjournal;
 
 import static org.junit.Assert.*;
+
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
@@ -25,6 +26,9 @@ import org.junit.BeforeClass;
 import org.junit.AfterClass;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 import org.apache.hadoop.hdfs.HAUtil;
@@ -35,12 +39,16 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 
 import org.apache.hadoop.ipc.RemoteException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ExitUtil.ExitException;
 
 import org.apache.bookkeeper.proto.BookieServer;
@@ -48,7 +56,9 @@ import org.apache.bookkeeper.proto.Booki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
 
 /**
  * Integration test to ensure that the BookKeeper JournalManager
@@ -67,6 +77,11 @@ public class TestBookKeeperAsHASharedDir
     bkutil = new BKJMUtil(numBookies);
     bkutil.start();
   }
+  
+  @Before
+  public void clearExitStatus() {
+    ExitUtil.resetFirstExitException();
+  }
 
   @AfterClass
   public static void teardownBookkeeper() throws Exception {
@@ -244,4 +259,97 @@ public class TestBookKeeperAsHASharedDir
       }
     }
   }
+  
+  /**
+   * Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy
+   * the edits log segments to new bkjm shared edits.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testInitializeBKSharedEdits() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      HAUtil.setAllowStandbyReads(conf, true);
+      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+
+      MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
+      cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
+          .numDataNodes(0).build();
+      cluster.waitActive();
+      // Shutdown and clear the current filebased shared dir.
+      cluster.shutdownNameNodes();
+      File shareddir = new File(cluster.getSharedEditsDir(0, 1));
+      assertTrue("Initial Shared edits dir not fully deleted",
+          FileUtil.fullyDelete(shareddir));
+
+      // Check namenodes should not start without shared dir.
+      assertCanNotStartNamenode(cluster, 0);
+      assertCanNotStartNamenode(cluster, 1);
+
+      // Configure bkjm as new shared edits dir in both namenodes
+      Configuration nn1Conf = cluster.getConfiguration(0);
+      Configuration nn2Conf = cluster.getConfiguration(1);
+      nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
+          .createJournalURI("/initializeSharedEdits").toString());
+      nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
+          .createJournalURI("/initializeSharedEdits").toString());
+      BKJMUtil.addJournalManagerDefinition(nn1Conf);
+      BKJMUtil.addJournalManagerDefinition(nn2Conf);
+
+      // Initialize the BKJM shared edits.
+      assertFalse(NameNode.initializeSharedEdits(nn1Conf));
+
+      // NameNode should be able to start and should be in sync with BKJM as
+      // shared dir
+      assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) {
+    try {
+      cluster.restartNameNode(nnIndex, false);
+      fail("Should not have been able to start NN" + (nnIndex)
+          + " without shared dir");
+    } catch (IOException ioe) {
+      LOG.info("Got expected exception", ioe);
+      GenericTestUtils.assertExceptionContains(
+          "Cannot start an HA namenode with name dirs that need recovery", ioe);
+    }
+  }
+
+  private void assertCanStartHANameNodes(MiniDFSCluster cluster,
+      Configuration conf, String path) throws ServiceFailedException,
+      IOException, URISyntaxException, InterruptedException {
+    // Now should be able to start both NNs. Pass "false" here so that we don't
+    // try to waitActive on all NNs, since the second NN doesn't exist yet.
+    cluster.restartNameNode(0, false);
+    cluster.restartNameNode(1, true);
+
+    // Make sure HA is working.
+    cluster
+        .getNameNode(0)
+        .getRpcServer()
+        .transitionToActive(
+            new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
+    FileSystem fs = null;
+    try {
+      Path newPath = new Path(path);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(newPath));
+      HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
+          cluster.getNameNode(1));
+      assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+          newPath.toString(), false).isDir());
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1373061&r1=1373060&r2=1373061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Aug 14 20:11:40 2012
@@ -17,18 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,10 +36,10 @@ import org.apache.hadoop.ha.HealthCheckF
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Trash;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -53,9 +47,6 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
-import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@@ -68,8 +59,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -88,6 +77,7 @@ import static org.apache.hadoop.util.Exi
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**********************************************************
@@ -767,9 +757,18 @@ public class NameNode {
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
+    
+    if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
+      LOG.fatal("No shared edits directory configured for namespace " +
+          nsId + " namenode " + namenodeId);
+      return false;
+    }
+
     NNStorage existingStorage = null;
     try {
-      FSNamesystem fsns = FSNamesystem.loadFromDisk(conf,
+      Configuration confWithoutShared = new Configuration(conf);
+      confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+      FSNamesystem fsns = FSNamesystem.loadFromDisk(confWithoutShared,
           FSNamesystem.getNamespaceDirs(conf),
           FSNamesystem.getNamespaceEditsDirs(conf, false));
       
@@ -799,11 +798,9 @@ public class NameNode {
       fsns.getFSImage().getEditLog().close();
       fsns.getFSImage().getEditLog().initJournalsForWrite();
       fsns.getFSImage().getEditLog().recoverUnclosedStreams();
-      
-      if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs,
-          newSharedStorage, conf)) {
-        return true; // aborted
-      }
+
+      copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
+          conf);
     } catch (IOException ioe) {
       LOG.error("Could not initialize shared edits dir", ioe);
       return true; // aborted
@@ -821,43 +818,59 @@ public class NameNode {
     }
     return false; // did not abort
   }
-  
-  private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
+
+  private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
       Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
-      Configuration conf) throws FileNotFoundException, IOException {
+      Configuration conf) throws IOException {
+    Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
+        "No shared edits specified");
     // Copy edit log segments into the new shared edits dir.
-    for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) {
-      FileJournalManager fjm = null;
-      if (!(jas.getManager() instanceof FileJournalManager)) {
-        LOG.error("Cannot populate shared edits dir from non-file " +
-            "journal manager: " + jas.getManager());
-        return true; // aborted
-      } else {
-        fjm = (FileJournalManager) jas.getManager();
-      }
-      for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage()
-          .getMostRecentCheckpointTxId())) {
-        File editLogSegment = elf.getFile();
-        for (URI sharedEditsUri : sharedEditsDirs) {
-          StorageDirectory sharedEditsDir = newSharedStorage
-              .getStorageDirectory(sharedEditsUri);
-          File targetFile = new File(sharedEditsDir.getCurrentDir(),
-              editLogSegment.getName());
-          if (!targetFile.exists()) {
-            InputStream in = null;
-            OutputStream out = null;
-            try {
-              in = new FileInputStream(editLogSegment);
-              out = new AtomicFileOutputStream(targetFile);
-              IOUtils.copyBytes(in, out, conf);
-            } finally {
-              IOUtils.cleanup(LOG, in, out);
-            }
-          }
+    List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
+    FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
+        sharedEditsUris);
+    newSharedEditLog.initJournalsForWrite();
+    newSharedEditLog.recoverUnclosedStreams();
+    
+    FSEditLog sourceEditLog = fsns.getFSImage().editLog;
+    
+    long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
+    Collection<EditLogInputStream> streams = sourceEditLog.selectInputStreams(
+        fromTxId+1, 0);
+
+    // Set the nextTxid to the CheckpointTxId+1
+    newSharedEditLog.setNextTxId(fromTxId + 1);
+    
+    // Copy all edits after last CheckpointTxId to shared edits dir
+    for (EditLogInputStream stream : streams) {
+      LOG.debug("Beginning to copy stream " + stream + " to shared edits");
+      FSEditLogOp op;
+      boolean segmentOpen = false;
+      while ((op = stream.readOp()) != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("copying op: " + op);
+        }
+        if (!segmentOpen) {
+          newSharedEditLog.startLogSegment(op.txid, false);
+          segmentOpen = true;
+        }
+        
+        newSharedEditLog.logEdit(op);
+
+        if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
+          newSharedEditLog.logSync();
+          newSharedEditLog.endCurrentLogSegment(false);
+          LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream);
+          segmentOpen = false;
         }
       }
+      
+      if (segmentOpen) {
+        LOG.debug("ending log segment because of end of stream in " + stream);
+        newSharedEditLog.logSync();
+        newSharedEditLog.endCurrentLogSegment(false);
+        segmentOpen = false;
+      }
     }
-    return false; // did not abort
   }
 
   private static boolean finalize(Configuration conf,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java?rev=1373061&r1=1373060&r2=1373061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java Tue Aug 14 20:11:40 2012
@@ -65,7 +65,7 @@ public abstract class HATestUtil {
    * @throws CouldNotCatchUpException if the standby doesn't catch up to the
    *         active in NN_LAG_TIMEOUT milliseconds
    */
-  static void waitForStandbyToCatchUp(NameNode active,
+  public static void waitForStandbyToCatchUp(NameNode active,
       NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
     
     long activeTxId = active.getNamesystem().getFSImage().getEditLog()

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java?rev=1373061&r1=1373060&r2=1373061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java Tue Aug 14 20:11:40 2012
@@ -159,6 +159,13 @@ public class TestInitializeSharedEdits {
   }
   
   @Test
+  public void testFailWhenNoSharedEditsSpecified() throws Exception {
+    Configuration confNoShared = new Configuration(conf);
+    confNoShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+    assertFalse(NameNode.initializeSharedEdits(confNoShared, true));
+  }
+  
+  @Test
   public void testDontOverWriteExistingDir() {
     assertFalse(NameNode.initializeSharedEdits(conf, false));
     assertTrue(NameNode.initializeSharedEdits(conf, false));