You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by um...@apache.org on 2012/11/08 18:45:47 UTC

svn commit: r1407182 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/

Author: umamahesh
Date: Thu Nov  8 17:45:46 2012
New Revision: 1407182

URL: http://svn.apache.org/viewvc?rev=1407182&view=rev
Log:
HDFS-3810. Implement format() for BKJM. Contributed by Ivan Kelly.



Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1407182&r1=1407181&r2=1407182&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov  8 17:45:46 2012
@@ -560,6 +560,8 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-3979. For hsync, datanode should wait for the local sync to complete
     before sending ack. (Lars Hofhansl via szetszwo)
 
+    HDFS-3810. Implement format() for BKJM (Ivan Kelly via umamahesh)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1407182&r1=1407181&r2=1407182&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java Thu Nov  8 17:45:46 2012
@@ -39,6 +39,7 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.ZKUtil;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -46,6 +47,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.io.IOException;
 
 import java.net.URI;
@@ -142,13 +144,16 @@ public class BookKeeperJournalManager im
   private final Configuration conf;
   private final BookKeeper bkc;
   private final CurrentInprogress ci;
+  private final String basePath;
   private final String ledgerPath;
+  private final String versionPath;
   private final MaxTxId maxTxId;
   private final int ensembleSize;
   private final int quorumSize;
   private final String digestpw;
   private final CountDownLatch zkConnectLatch;
   private final NamespaceInfo nsInfo;
+  private boolean initialized = false;
   private LedgerHandle currentLedger = null;
 
   /**
@@ -160,16 +165,16 @@ public class BookKeeperJournalManager im
     this.nsInfo = nsInfo;
 
     String zkConnect = uri.getAuthority().replace(";", ",");
-    String zkPath = uri.getPath();
+    basePath = uri.getPath();
     ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
                                BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
     quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
                              BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
 
-    ledgerPath = zkPath + "/ledgers";
-    String maxTxIdPath = zkPath + "/maxtxid";
-    String currentInprogressNodePath = zkPath + "/CurrentInprogress";
-    String versionPath = zkPath + "/version";
+    ledgerPath = basePath + "/ledgers";
+    String maxTxIdPath = basePath + "/maxtxid";
+    String currentInprogressNodePath = basePath + "/CurrentInprogress";
+    versionPath = basePath + "/version";
     digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
                         BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
 
@@ -180,47 +185,7 @@ public class BookKeeperJournalManager im
       if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
         throw new IOException("Error connecting to zookeeper");
       }
-      if (zkc.exists(zkPath, false) == null) {
-        zkc.create(zkPath, new byte[] {'0'},
-            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      }
 
-      Stat versionStat = zkc.exists(versionPath, false);
-      if (versionStat != null) {
-        byte[] d = zkc.getData(versionPath, false, versionStat);
-        VersionProto.Builder builder = VersionProto.newBuilder();
-        TextFormat.merge(new String(d, UTF_8), builder);
-        if (!builder.isInitialized()) {
-          throw new IOException("Invalid/Incomplete data in znode");
-        }
-        VersionProto vp = builder.build();
-
-        // There's only one version at the moment
-        assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
-
-        NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
-
-        if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
-            !nsInfo.clusterID.equals(readns.getClusterID()) ||
-            !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
-          String err = String.format("Environment mismatch. Running process %s"
-                                     +", stored in ZK %s", nsInfo, readns);
-          LOG.error(err);
-          throw new IOException(err);
-        }
-      } else if (nsInfo.getNamespaceID() > 0) {
-        VersionProto.Builder builder = VersionProto.newBuilder();
-        builder.setNamespaceInfo(PBHelper.convert(nsInfo))
-          .setLayoutVersion(BKJM_LAYOUT_VERSION);
-        byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
-        zkc.create(versionPath, data,
-                   Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      }
-
-      if (zkc.exists(ledgerPath, false) == null) {
-        zkc.create(ledgerPath, new byte[] {'0'},
-            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      }
       prepareBookKeeperEnv();
       bkc = new BookKeeper(new ClientConfiguration(), zkc);
     } catch (KeeperException e) {
@@ -244,6 +209,7 @@ public class BookKeeperJournalManager im
         BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
     final CountDownLatch zkPathLatch = new CountDownLatch(1);
 
+    final AtomicBoolean success = new AtomicBoolean(false);
     StringCallback callback = new StringCallback() {
       @Override
       public void processResult(int rc, String path, Object ctx, String name) {
@@ -251,22 +217,23 @@ public class BookKeeperJournalManager im
             || KeeperException.Code.NODEEXISTS.intValue() == rc) {
           LOG.info("Successfully created bookie available path : "
               + zkAvailablePath);
-          zkPathLatch.countDown();
+          success.set(true);
         } else {
           KeeperException.Code code = KeeperException.Code.get(rc);
-          LOG
-              .error("Error : "
+          LOG.error("Error : "
                   + KeeperException.create(code, path).getMessage()
                   + ", failed to create bookie available path : "
                   + zkAvailablePath);
         }
+        zkPathLatch.countDown();
       }
     };
     ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
 
     try {
-      if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)) {
+      if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)
+          || !success.get()) {
         throw new IOException("Couldn't create bookie available path :"
             + zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
             + " millis");
@@ -281,19 +248,101 @@ public class BookKeeperJournalManager im
 
   @Override
   public void format(NamespaceInfo ns) throws IOException {
-    // Currently, BKJM automatically formats itself when first accessed.
-    // TODO: change over to explicit formatting so that the admin can
-    // clear out the BK storage when reformatting a cluster.
-    LOG.info("Not formatting " + this + " - BKJM does not currently " +
-        "support reformatting. If it has not been used before, it will" +
-        "be formatted automatically upon first use.");
+    try {
+      // delete old info
+      Stat baseStat = null;
+      Stat ledgerStat = null;
+      if ((baseStat = zkc.exists(basePath, false)) != null) {
+        if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) {
+          for (EditLogLedgerMetadata l : getLedgerList(true)) {
+            try {
+              bkc.deleteLedger(l.getLedgerId());
+            } catch (BKException.BKNoSuchLedgerExistsException bke) {
+              LOG.warn("Ledger " + l.getLedgerId() + " does not exist;"
+                       + " Cannot delete.");
+            }
+          }
+        }
+        ZKUtil.deleteRecursive(zkc, basePath);
+      }
+
+      // should be clean now.
+      zkc.create(basePath, new byte[] {'0'},
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+      VersionProto.Builder builder = VersionProto.newBuilder();
+      builder.setNamespaceInfo(PBHelper.convert(ns))
+        .setLayoutVersion(BKJM_LAYOUT_VERSION);
+
+      byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+      zkc.create(versionPath, data,
+                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+      zkc.create(ledgerPath, new byte[] {'0'},
+                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    } catch (KeeperException ke) {
+      LOG.error("Error accessing zookeeper to format", ke);
+      throw new IOException("Error accessing zookeeper to format", ke);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted during format", ie);
+    } catch (BKException bke) {
+      throw new IOException("Error cleaning up ledgers during format", bke);
+    }
   }
   
   @Override
   public boolean hasSomeData() throws IOException {
-    // Don't confirm format on BKJM, since format() is currently a
-    // no-op anyway
-    return false;
+    try {
+      return zkc.exists(basePath, false) != null;
+    } catch (KeeperException ke) {
+      throw new IOException("Couldn't contact zookeeper", ke);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while checking for data", ie);
+    }
+  }
+
+  synchronized private void checkEnv() throws IOException {
+    if (!initialized) {
+      try {
+        Stat versionStat = zkc.exists(versionPath, false);
+        if (versionStat == null) {
+          throw new IOException("Environment not initialized. "
+                                +"Have you forgotten to format?");
+        }
+        byte[] d = zkc.getData(versionPath, false, versionStat);
+
+        VersionProto.Builder builder = VersionProto.newBuilder();
+        TextFormat.merge(new String(d, UTF_8), builder);
+        if (!builder.isInitialized()) {
+          throw new IOException("Invalid/Incomplete data in znode");
+        }
+        VersionProto vp = builder.build();
+
+        // There's only one version at the moment
+        assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
+
+        NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
+
+        if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
+            !nsInfo.clusterID.equals(readns.getClusterID()) ||
+            !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
+          String err = String.format("Environment mismatch. Running process %s"
+                                     +", stored in ZK %s", nsInfo, readns);
+          LOG.error(err);
+          throw new IOException(err);
+        }
+
+        ci.init();
+        initialized = true;
+      } catch (KeeperException ke) {
+        throw new IOException("Cannot access ZooKeeper", ke);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while checking environment", ie);
+      }
+    }
   }
 
   /**
@@ -307,6 +356,8 @@ public class BookKeeperJournalManager im
    */
   @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
+    checkEnv();
+
     if (txId <= maxTxId.get()) {
       throw new IOException("We've already seen " + txId
           + ". A new stream cannot be created with it");
@@ -384,6 +435,8 @@ public class BookKeeperJournalManager im
   @Override
   public void finalizeLogSegment(long firstTxId, long lastTxId)
       throws IOException {
+    checkEnv();
+
     String inprogressPath = inprogressZNode(firstTxId);
     try {
       Stat inprogressStat = zkc.exists(inprogressPath, false);
@@ -537,6 +590,8 @@ public class BookKeeperJournalManager im
 
   @Override
   public void recoverUnfinalizedSegments() throws IOException {
+    checkEnv();
+
     synchronized (this) {
       try {
         List<String> children = zkc.getChildren(ledgerPath, false);
@@ -589,6 +644,8 @@ public class BookKeeperJournalManager im
   @Override
   public void purgeLogsOlderThan(long minTxIdToKeep)
       throws IOException {
+    checkEnv();
+
     for (EditLogLedgerMetadata l : getLedgerList(false)) {
       if (l.getLastTxId() < minTxIdToKeep) {
         try {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java?rev=1407182&r1=1407181&r2=1407182&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java Thu Nov  8 17:45:46 2012
@@ -56,6 +56,9 @@ class CurrentInprogress {
   CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
     this.currentInprogressNode = lockpath;
     this.zkc = zkc;
+  }
+
+  void init() throws IOException {
     try {
       Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
                                                       false);
@@ -96,15 +99,14 @@ class CurrentInprogress {
           this.versionNumberForPermission);
     } catch (KeeperException e) {
       throw new IOException("Exception when setting the data "
-          + "[layout version number,hostname,inprogressNode path]= [" + content
-          + "] to CurrentInprogress. ", e);
+          + "[" + content + "] to CurrentInprogress. ", e);
     } catch (InterruptedException e) {
       throw new IOException("Interrupted while setting the data "
-          + "[layout version number,hostname,inprogressNode path]= [" + content
-          + "] to CurrentInprogress", e);
+          + "[" + content + "] to CurrentInprogress", e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Updated data[" + content + "] to CurrentInprogress");
     }
-    LOG.info("Updated data[layout version number,hostname,inprogressNode path]"
-        + "= [" + content + "] to CurrentInprogress");
   }
 
   /**
@@ -136,7 +138,7 @@ class CurrentInprogress {
       }
       return builder.build().getPath();
     } else {
-      LOG.info("No data available in CurrentInprogress");
+      LOG.debug("No data available in CurrentInprogress");
     }
     return null;
   }
@@ -152,7 +154,7 @@ class CurrentInprogress {
       throw new IOException(
           "Interrupted when setting the data to CurrentInprogress node", e);
     }
-    LOG.info("Cleared the data from CurrentInprogress");
+    LOG.debug("Cleared the data from CurrentInprogress");
   }
 
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java?rev=1407182&r1=1407181&r2=1407182&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java Thu Nov  8 17:45:46 2012
@@ -149,6 +149,7 @@ public class TestBookKeeperConfiguration
     bkjm = new BookKeeperJournalManager(conf,
         URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
         nsi);
+    bkjm.format(nsi);
     Assert.assertNotNull("Bookie available path : " + bkAvailablePath
         + " doesn't exists", zkc.exists(bkAvailablePath, false));
   }
@@ -166,6 +167,7 @@ public class TestBookKeeperConfiguration
     bkjm = new BookKeeperJournalManager(conf,
         URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
         nsi);
+    bkjm.format(nsi);
     Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
         + " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1407182&r1=1407181&r2=1407182&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Thu Nov  8 17:45:46 2012
@@ -29,8 +29,16 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Random;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@@ -90,6 +98,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
+    bkjm.format(nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
@@ -112,6 +121,8 @@ public class TestBookKeeperJournalManage
 
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
+    bkjm.format(nsi);
+
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -130,6 +141,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
+    bkjm.format(nsi);
 
     long txid = 1;
     for (long i = 0; i < 3; i++) {
@@ -167,6 +179,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
+    bkjm.format(nsi);
 
     long txid = 1;
     for (long i = 0; i < 3; i++) {
@@ -208,6 +221,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
+    bkjm.format(nsi);
 
     long txid = 1;
     long start = txid;
@@ -266,6 +280,7 @@ public class TestBookKeeperJournalManage
 
     BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
+    bkjm1.format(nsi);
 
     BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
@@ -288,6 +303,7 @@ public class TestBookKeeperJournalManage
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
         nsi);
+    bkjm.format(nsi);
 
     final long numTransactions = 10000;
     EditLogOutputStream out = bkjm.startLogSegment(1);
@@ -315,6 +331,7 @@ public class TestBookKeeperJournalManage
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
         BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
         nsi);
+    bkjm.format(nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
@@ -365,6 +382,7 @@ public class TestBookKeeperJournalManage
       BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
           BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
           nsi);
+      bkjm.format(nsi);
       EditLogOutputStream out = bkjm.startLogSegment(txid);
 
       for (long i = 1 ; i <= 3; i++) {
@@ -450,6 +468,7 @@ public class TestBookKeeperJournalManage
       BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
           BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
           nsi);
+      bkjm.format(nsi);
 
       EditLogOutputStream out = bkjm.startLogSegment(txid);
       for (long i = 1 ; i <= 3; i++) {
@@ -500,6 +519,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
                                                                  nsi);
+    bkjm.format(nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -541,6 +561,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
                                                                  nsi);
+    bkjm.format(nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -583,6 +604,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
                                                                  nsi);
+    bkjm.format(nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -622,6 +644,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
                                                                  nsi);
+    bkjm.format(nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -669,6 +692,7 @@ public class TestBookKeeperJournalManage
     NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
                                                                  nsi);
+    bkjm.format(nsi);
 
     try {
       // start new inprogress log segment with txid=1
@@ -697,6 +721,81 @@ public class TestBookKeeperJournalManage
     }
   }
 
+  private enum ThreadStatus {
+    COMPLETED, GOODEXCEPTION, BADEXCEPTION;
+  };
+
+  /**
+   * Tests that concurrent calls to format will still allow one to succeed.
+   */
+  @Test
+  public void testConcurrentFormat() throws Exception {
+    final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat");
+    final NamespaceInfo nsi = newNSInfo();
+
+    // populate with data first
+    BookKeeperJournalManager bkjm
+      = new BookKeeperJournalManager(conf, uri, nsi);
+    bkjm.format(nsi);
+    for (int i = 1; i < 100*2; i += 2) {
+      bkjm.startLogSegment(i);
+      bkjm.finalizeLogSegment(i, i+1);
+    }
+    bkjm.close();
+
+    final int numThreads = 40;
+    List<Callable<ThreadStatus>> threads
+      = new ArrayList<Callable<ThreadStatus>>();
+    final CyclicBarrier barrier = new CyclicBarrier(numThreads);
+
+    for (int i = 0; i < numThreads; i++) {
+      threads.add(new Callable<ThreadStatus>() {
+          public ThreadStatus call() {
+            BookKeeperJournalManager bkjm = null;
+            try {
+              bkjm = new BookKeeperJournalManager(conf, uri, nsi);
+              barrier.await();
+              bkjm.format(nsi);
+              return ThreadStatus.COMPLETED;
+            } catch (IOException ioe) {
+              LOG.info("Exception formatting ", ioe);
+              return ThreadStatus.GOODEXCEPTION;
+            } catch (InterruptedException ie) {
+              LOG.error("Interrupted. Something is broken", ie);
+              Thread.currentThread().interrupt();
+              return ThreadStatus.BADEXCEPTION;
+            } catch (Exception e) {
+              LOG.error("Some other bad exception", e);
+              return ThreadStatus.BADEXCEPTION;
+            } finally {
+              if (bkjm != null) {
+                try {
+                  bkjm.close();
+                } catch (IOException ioe) {
+                  LOG.error("Error closing journal manager", ioe);
+                }
+              }
+            }
+          }
+        });
+    }
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<Future<ThreadStatus>> statuses = service.invokeAll(threads, 60,
+                                                      TimeUnit.SECONDS);
+    int numCompleted = 0;
+    for (Future<ThreadStatus> s : statuses) {
+      assertTrue(s.isDone());
+      assertTrue("Thread threw invalid exception",
+          s.get() == ThreadStatus.COMPLETED
+          || s.get() == ThreadStatus.GOODEXCEPTION);
+      if (s.get() == ThreadStatus.COMPLETED) {
+        numCompleted++;
+      }
+    }
+    LOG.info("Completed " + numCompleted + " formats");
+    assertTrue("No thread managed to complete formatting", numCompleted > 0);
+  }
+
   private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
       int startTxid, int endTxid) throws IOException, KeeperException,
       InterruptedException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java?rev=1407182&r1=1407181&r2=1407182&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java Thu Nov  8 17:45:46 2012
@@ -118,6 +118,7 @@ public class TestCurrentInprogress {
   public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
     String data = "inprogressNode";
     CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+    ci.init();
     ci.update(data);
     String inprogressNodePath = ci.read();
     assertEquals("Not returning inprogressZnode", "inprogressNode",
@@ -131,6 +132,7 @@ public class TestCurrentInprogress {
   @Test
   public void testReadShouldReturnNullAfterClear() throws Exception {
     CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+    ci.init();
     ci.update("myInprogressZnode");
     ci.read();
     ci.clear();
@@ -146,6 +148,7 @@ public class TestCurrentInprogress {
   public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
       throws Exception {
     CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+    ci.init();
     ci.update("myInprogressZnode");
     assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
         .read());
@@ -154,4 +157,4 @@ public class TestCurrentInprogress {
     ci.update("myInprogressZnode");
   }
 
-}
\ No newline at end of file
+}