You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/10/06 13:59:04 UTC

[2/3] hadoop git commit: HDFS-10957. Retire BKJM from trunk (Vinayakumar B)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
deleted file mode 100644
index b1fc3d7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.*;
-
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
-
-import java.io.IOException;
-import java.io.File;
-
-/**
- * Utility class for setting up bookkeeper ensembles
- * and bringing individual bookies up and down
- */
-class BKJMUtil {
-  protected static final Log LOG = LogFactory.getLog(BKJMUtil.class);
-
-  int nextPort = 6000; // next port for additionally created bookies
-  private Thread bkthread = null;
-  private final static String zkEnsemble = "127.0.0.1:2181";
-  int numBookies;
-
-  BKJMUtil(final int numBookies) throws Exception {
-    this.numBookies = numBookies;
-
-    bkthread = new Thread() {
-        public void run() {
-          try {
-            String[] args = new String[1];
-            args[0] = String.valueOf(numBookies);
-            LOG.info("Starting bk");
-            LocalBookKeeper.main(args);
-          } catch (InterruptedException e) {
-            // go away quietly
-          } catch (Exception e) {
-            LOG.error("Error starting local bk", e);
-          }
-        }
-      };
-  }
-
-  void start() throws Exception {
-    bkthread.start();
-    if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
-      throw new Exception("Error starting zookeeper/bookkeeper");
-    }
-    assertEquals("Not all bookies started",
-                 numBookies, checkBookiesUp(numBookies, 10));
-  }
-
-  void teardown() throws Exception {
-    if (bkthread != null) {
-      bkthread.interrupt();
-      bkthread.join();
-    }
-  }
-
-  static ZooKeeper connectZooKeeper()
-      throws IOException, KeeperException, InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
-        public void process(WatchedEvent event) {
-          if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
-            latch.countDown();
-          }
-        }
-      });
-    if (!latch.await(3, TimeUnit.SECONDS)) {
-      throw new IOException("Zookeeper took too long to connect");
-    }
-    return zkc;
-  }
-
-  static URI createJournalURI(String path) throws Exception {
-    return URI.create("bookkeeper://" + zkEnsemble + path);
-  }
-
-  static void addJournalManagerDefinition(Configuration conf) {
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper",
-             "org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager");
-  }
-
-  BookieServer newBookie() throws Exception {
-    int port = nextPort++;
-    ServerConfiguration bookieConf = new ServerConfiguration();
-    bookieConf.setBookiePort(port);
-    File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
-                                      "test");
-    tmpdir.delete();
-    tmpdir.mkdir();
-
-    bookieConf.setZkServers(zkEnsemble);
-    bookieConf.setJournalDirName(tmpdir.getPath());
-    bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
-
-    BookieServer b = new BookieServer(bookieConf);
-    b.start();
-    for (int i = 0; i < 10 && !b.isRunning(); i++) {
-      Thread.sleep(10000);
-    }
-    if (!b.isRunning()) {
-      throw new IOException("Bookie would not start");
-    }
-    return b;
-  }
-
-  /**
-   * Check that a number of bookies are available
-   * @param count number of bookies required
-   * @param timeout number of seconds to wait for bookies to start
-   * @throws IOException if bookies are not started by the time the timeout hits
-   */
-  int checkBookiesUp(int count, int timeout) throws Exception {
-    ZooKeeper zkc = connectZooKeeper();
-    try {
-      int mostRecentSize = 0;
-      for (int i = 0; i < timeout; i++) {
-        try {
-          List<String> children = zkc.getChildren("/ledgers/available",
-                                                  false);
-          mostRecentSize = children.size();
-          // Skip 'readonly znode' which is used for keeping R-O bookie details
-          if (children.contains("readonly")) {
-            mostRecentSize = children.size() - 1;
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found " + mostRecentSize + " bookies up, "
-                      + "waiting for " + count);
-            if (LOG.isTraceEnabled()) {
-              for (String child : children) {
-                LOG.trace(" server: " + child);
-              }
-            }
-          }
-          if (mostRecentSize == count) {
-            break;
-          }
-        } catch (KeeperException e) {
-          // ignore
-        }
-        Thread.sleep(1000);
-      }
-      return mostRecentSize;
-    } finally {
-      zkc.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
deleted file mode 100644
index ff8c00d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
-import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import org.apache.hadoop.hdfs.HAUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-
-import org.apache.hadoop.ipc.RemoteException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ExitUtil.ExitException;
-
-import org.apache.bookkeeper.proto.BookieServer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * Integration test to ensure that the BookKeeper JournalManager
- * works for HDFS Namenode HA
- */
-@RunWith(Parameterized.class)
-public class TestBookKeeperAsHASharedDir {
-  static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
-
-  private static BKJMUtil bkutil;
-  static int numBookies = 3;
-
-  private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
-
-  @Parameters
-  public static Collection<Object[]> data() {
-    Collection<Object[]> params = new ArrayList<Object[]>();
-    params.add(new Object[]{ Boolean.FALSE });
-    params.add(new Object[]{ Boolean.TRUE });
-    return params;
-  }
-
-  private static boolean useAsyncEditLog;
-  public TestBookKeeperAsHASharedDir(Boolean async) {
-    useAsyncEditLog = async;
-  }
-
-  private static Configuration getConf() {
-    Configuration conf = new Configuration();
-    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
-        useAsyncEditLog);
-    return conf;
-  }
-
-  @BeforeClass
-  public static void setupBookkeeper() throws Exception {
-    bkutil = new BKJMUtil(numBookies);
-    bkutil.start();
-  }
-  
-  @Before
-  public void clearExitStatus() {
-    ExitUtil.resetFirstExitException();
-  }
-
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    bkutil.teardown();
-  }
-
-  /**
-   * Test simple HA failover usecase with BK
-   */
-  @Test
-  public void testFailoverWithBK() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = getConf();
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
-               BKJMUtil.createJournalURI("/hotfailover").toString());
-      BKJMUtil.addJournalManagerDefinition(conf);
-
-      cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(0)
-        .manageNameDfsSharedDirs(false)
-        .build();
-      NameNode nn1 = cluster.getNameNode(0);
-      NameNode nn2 = cluster.getNameNode(1);
-
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-
-      Path p = new Path("/testBKJMfailover");
-
-      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
-
-      fs.mkdirs(p);
-      cluster.shutdownNameNode(0);
-
-      cluster.transitionToActive(1);
-
-      assertTrue(fs.exists(p));
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  /**
-   * Test HA failover, where BK, as the shared storage, fails.
-   * Once it becomes available again, a standby can come up.
-   * Verify that any write happening after the BK fail is not
-   * available on the standby.
-   */
-  @Test
-  public void testFailoverWithFailingBKCluster() throws Exception {
-    int ensembleSize = numBookies + 1;
-    BookieServer newBookie = bkutil.newBookie();
-    assertEquals("New bookie didn't start",
-                 ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-    BookieServer replacementBookie = null;
-
-    MiniDFSCluster cluster = null;
-
-    try {
-      Configuration conf = getConf();
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
-               BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
-      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-                  ensembleSize);
-      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
-                  ensembleSize);
-      BKJMUtil.addJournalManagerDefinition(conf);
-
-      cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(0)
-        .manageNameDfsSharedDirs(false)
-        .checkExitOnShutdown(false)
-        .build();
-      NameNode nn1 = cluster.getNameNode(0);
-      NameNode nn2 = cluster.getNameNode(1);
-
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-
-      Path p1 = new Path("/testBKJMFailingBKCluster1");
-      Path p2 = new Path("/testBKJMFailingBKCluster2");
-
-      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
-
-      fs.mkdirs(p1);
-      newBookie.shutdown(); // will take down shared storage
-      assertEquals("New bookie didn't stop",
-                   numBookies, bkutil.checkBookiesUp(numBookies, 10));
-
-      try {
-        fs.mkdirs(p2);
-        fail("mkdirs should result in the NN exiting");
-      } catch (RemoteException re) {
-        assertTrue(re.getClassName().contains("ExitException"));
-      }
-      cluster.shutdownNameNode(0);
-
-      try {
-        cluster.transitionToActive(1);
-        fail("Shouldn't have been able to transition with bookies down");
-      } catch (ExitException ee) {
-        assertTrue("Should shutdown due to required journal failure",
-            ee.getMessage().contains(
-                "starting log segment 3 failed for required journal"));
-      }
-
-      replacementBookie = bkutil.newBookie();
-      assertEquals("Replacement bookie didn't start",
-                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-      cluster.transitionToActive(1); // should work fine now
-
-      assertTrue(fs.exists(p1));
-      assertFalse(fs.exists(p2));
-    } finally {
-      newBookie.shutdown();
-      if (replacementBookie != null) {
-        replacementBookie.shutdown();
-      }
-
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  /**
-   * Test that two namenodes can't continue as primary
-   */
-  @Test
-  public void testMultiplePrimariesStarted() throws Exception {
-    Path p1 = new Path("/testBKJMMultiplePrimary");
-
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = getConf();
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
-               BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
-      BKJMUtil.addJournalManagerDefinition(conf);
-
-      cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(0)
-        .manageNameDfsSharedDirs(false)
-        .checkExitOnShutdown(false)
-        .build();
-      NameNode nn1 = cluster.getNameNode(0);
-      NameNode nn2 = cluster.getNameNode(1);
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-
-      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
-      fs.mkdirs(p1);
-      nn1.getRpcServer().rollEditLog();
-      cluster.transitionToActive(1);
-      fs = cluster.getFileSystem(0); // get the older active server.
-
-      try {
-        System.out.println("DMS: > *************");
-        boolean foo = fs.delete(p1, true);
-        System.out.println("DMS: < ************* "+foo);
-        fail("Log update on older active should cause it to exit");
-      } catch (RemoteException re) {
-        assertTrue(re.getClassName().contains("ExitException"));
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-  
-  /**
-   * Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy
-   * the edits log segments to new bkjm shared edits.
-   * 
-   * @throws Exception
-   */
-  @Test
-  public void testInitializeBKSharedEdits() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = getConf();
-      HAUtil.setAllowStandbyReads(conf, true);
-
-      MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
-      cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
-          .numDataNodes(0).build();
-      cluster.waitActive();
-      // Shutdown and clear the current filebased shared dir.
-      cluster.shutdownNameNodes();
-      File shareddir = new File(cluster.getSharedEditsDir(0, 1));
-      assertTrue("Initial Shared edits dir not fully deleted",
-          FileUtil.fullyDelete(shareddir));
-
-      // Check namenodes should not start without shared dir.
-      assertCanNotStartNamenode(cluster, 0);
-      assertCanNotStartNamenode(cluster, 1);
-
-      // Configure bkjm as new shared edits dir in both namenodes
-      Configuration nn1Conf = cluster.getConfiguration(0);
-      Configuration nn2Conf = cluster.getConfiguration(1);
-      nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
-          .createJournalURI("/initializeSharedEdits").toString());
-      nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
-          .createJournalURI("/initializeSharedEdits").toString());
-      BKJMUtil.addJournalManagerDefinition(nn1Conf);
-      BKJMUtil.addJournalManagerDefinition(nn2Conf);
-
-      // Initialize the BKJM shared edits.
-      assertFalse(NameNode.initializeSharedEdits(nn1Conf));
-
-      // NameNode should be able to start and should be in sync with BKJM as
-      // shared dir
-      assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize");
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) {
-    try {
-      cluster.restartNameNode(nnIndex, false);
-      fail("Should not have been able to start NN" + (nnIndex)
-          + " without shared dir");
-    } catch (IOException ioe) {
-      LOG.info("Got expected exception", ioe);
-      GenericTestUtils.assertExceptionContains(
-          "storage directory does not exist or is not accessible", ioe);
-    }
-  }
-
-  private void assertCanStartHANameNodes(MiniDFSCluster cluster,
-      Configuration conf, String path) throws ServiceFailedException,
-      IOException, URISyntaxException, InterruptedException {
-    // Now should be able to start both NNs. Pass "false" here so that we don't
-    // try to waitActive on all NNs, since the second NN doesn't exist yet.
-    cluster.restartNameNode(0, false);
-    cluster.restartNameNode(1, true);
-
-    // Make sure HA is working.
-    cluster
-        .getNameNode(0)
-        .getRpcServer()
-        .transitionToActive(
-            new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
-    FileSystem fs = null;
-    try {
-      Path newPath = new Path(path);
-      fs = HATestUtil.configureFailoverFs(cluster, conf);
-      assertTrue(fs.mkdirs(newPath));
-      HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
-          cluster.getNameNode(1));
-      assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
-          newPath.toString(), false).isDir());
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
-  /**
-   * NameNode should load the edits correctly if the applicable edits are
-   * present in the BKJM.
-   */
-  @Test
-  public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = getConf();
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
-          .createJournalURI("/correctEditLogSelection").toString());
-      BKJMUtil.addJournalManagerDefinition(conf);
-
-      cluster = new MiniDFSCluster.Builder(conf)
-          .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0)
-          .manageNameDfsSharedDirs(false).build();
-      NameNode nn1 = cluster.getNameNode(0);
-      NameNode nn2 = cluster.getNameNode(1);
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-      nn1.getRpcServer().rollEditLog(); // Roll Edits from current Active.
-      // Transition to standby current active gracefully.
-      cluster.transitionToStandby(0);
-      // Make the other Active and Roll edits multiple times
-      cluster.transitionToActive(1);
-      nn2.getRpcServer().rollEditLog();
-      nn2.getRpcServer().rollEditLog();
-      // Now One more failover. So NN1 should be able to failover successfully.
-      cluster.transitionToStandby(1);
-      cluster.transitionToActive(0);
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
deleted file mode 100644
index f3f6ce5..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.Random;
-
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-
-public class TestBookKeeperConfiguration {
-  private static final Log LOG = LogFactory
-      .getLog(TestBookKeeperConfiguration.class);
-  private static final int ZK_SESSION_TIMEOUT = 5000;
-  private static final String HOSTPORT = "127.0.0.1:2181";
-  private static final int CONNECTION_TIMEOUT = 30000;
-  private static NIOServerCnxnFactory serverFactory;
-  private static ZooKeeperServer zks;
-  private static ZooKeeper zkc;
-  private static int ZooKeeperDefaultPort = 2181;
-  private static File ZkTmpDir;
-  private BookKeeperJournalManager bkjm;
-  private static final String BK_ROOT_PATH = "/ledgers";
-
-  private static ZooKeeper connectZooKeeper(String ensemble)
-      throws IOException, KeeperException, InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    ZooKeeper zkc = new ZooKeeper(HOSTPORT, ZK_SESSION_TIMEOUT, new Watcher() {
-      public void process(WatchedEvent event) {
-        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
-          latch.countDown();
-        }
-      }
-    });
-    if (!latch.await(ZK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS)) {
-      throw new IOException("Zookeeper took too long to connect");
-    }
-    return zkc;
-  }
-
-  private NamespaceInfo newNSInfo() {
-    Random r = new Random();
-    return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
-  }
-
-  @BeforeClass
-  public static void setupZooKeeper() throws Exception {
-    // create a ZooKeeper server(dataDir, dataLogDir, port)
-    LOG.info("Starting ZK server");
-    ZkTmpDir = File.createTempFile("zookeeper", "test");
-    ZkTmpDir.delete();
-    ZkTmpDir.mkdir();
-
-    try {
-      zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
-      serverFactory = new NIOServerCnxnFactory();
-      serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
-      serverFactory.startup(zks);
-    } catch (Exception e) {
-      LOG.error("Exception while instantiating ZooKeeper", e);
-    }
-
-    boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
-    LOG.debug("ZooKeeper server up: " + b);
-  }
-
-  @Before
-  public void setup() throws Exception {
-    zkc = connectZooKeeper(HOSTPORT);
-    try {
-      ZKUtil.deleteRecursive(zkc, BK_ROOT_PATH);
-    } catch (KeeperException.NoNodeException e) {
-      LOG.debug("Ignoring no node exception on cleanup", e);
-    } catch (Exception e) {
-      LOG.error("Exception when deleting bookie root path in zk", e);
-    }
-  }
-
-  @After
-  public void teardown() throws Exception {
-    if (null != zkc) {
-      zkc.close();
-    }
-    if (null != bkjm) {
-      bkjm.close();
-    }
-  }
-
-  @AfterClass
-  public static void teardownZooKeeper() throws Exception {
-    if (null != zkc) {
-      zkc.close();
-    }
-  }
-
-  /**
-   * Verify the BKJM is creating the bookie available path configured in
-   * 'dfs.namenode.bookkeeperjournal.zk.availablebookies'
-   */
-  @Test
-  public void testWithConfiguringBKAvailablePath() throws Exception {
-    // set Bookie available path in the configuration
-    String bkAvailablePath 
-      = BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT;
-    Configuration conf = new Configuration();
-    conf.setStrings(BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH,
-        bkAvailablePath);
-    Assert.assertNull(bkAvailablePath + " already exists", zkc.exists(
-        bkAvailablePath, false));
-    NamespaceInfo nsi = newNSInfo();
-    bkjm = new BookKeeperJournalManager(conf,
-        URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
-        nsi);
-    bkjm.format(nsi);
-    Assert.assertNotNull("Bookie available path : " + bkAvailablePath
-        + " doesn't exists", zkc.exists(bkAvailablePath, false));
-  }
-
-  /**
-   * Verify the BKJM is creating the bookie available default path, when there
-   * is no 'dfs.namenode.bookkeeperjournal.zk.availablebookies' configured
-   */
-  @Test
-  public void testDefaultBKAvailablePath() throws Exception {
-    Configuration conf = new Configuration();
-    Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists(
-        BK_ROOT_PATH, false));
-    NamespaceInfo nsi = newNSInfo();
-    bkjm = new BookKeeperJournalManager(conf,
-        URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
-        nsi);
-    bkjm.format(nsi);
-    Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
-        + " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
deleted file mode 100644
index 52e4568..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Unit test for the bkjm's streams
- */
-public class TestBookKeeperEditLogStreams {
-  static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class);
-
-  private static BKJMUtil bkutil;
-  private final static int numBookies = 3;
-
-  @BeforeClass
-  public static void setupBookkeeper() throws Exception {
-    bkutil = new BKJMUtil(numBookies);
-    bkutil.start();
-  }
-
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    bkutil.teardown();
-  }
-
-  /**
-   * Test that bkjm will refuse open a stream on an empty
-   * ledger.
-   */
-  @Test
-  public void testEmptyInputStream() throws Exception {
-    ZooKeeper zk = BKJMUtil.connectZooKeeper();
-
-    BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk);
-    try {
-      LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar"
-          .getBytes());
-      lh.close();
-
-      EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar",
-          HdfsServerConstants.NAMENODE_LAYOUT_VERSION, lh.getId(), 0x1234);
-      try {
-        new BookKeeperEditLogInputStream(lh, metadata, -1);
-        fail("Shouldn't get this far, should have thrown");
-      } catch (IOException ioe) {
-        assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
-      }
-
-      metadata = new EditLogLedgerMetadata("/foobar",
-          HdfsServerConstants.NAMENODE_LAYOUT_VERSION, lh.getId(), 0x1234);
-      try {
-        new BookKeeperEditLogInputStream(lh, metadata, 0);
-        fail("Shouldn't get this far, should have thrown");
-      } catch (IOException ioe) {
-        assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
-      }
-    } finally {
-      bkc.close();
-      zk.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
deleted file mode 100644
index b8fc30d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-import java.net.BindException;
-import java.util.Random;
-
-/**
- * Runs the same tests as TestStandbyCheckpoints, but
- * using a bookkeeper journal manager as the shared directory
- */
-public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
-  //overwrite the nn count
- static{
-   TestStandbyCheckpoints.NUM_NNS = 2;
- }
-  private static BKJMUtil bkutil = null;
-  static int numBookies = 3;
-  static int journalCount = 0;
-  private final Random random = new Random();
-
-  private static final Log LOG = LogFactory.getLog(TestStandbyCheckpoints.class);
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  @Before
-  public void setupCluster() throws Exception {
-    Configuration conf = setupCommonConfig();
-    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
-             BKJMUtil.createJournalURI("/checkpointing" + journalCount++)
-             .toString());
-    BKJMUtil.addJournalManagerDefinition(conf);
-
-    int retryCount = 0;
-    while (true) {
-      try {
-        int basePort = 10060 + random.nextInt(100) * 2;
-        MiniDFSNNTopology topology = new MiniDFSNNTopology()
-          .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
-            .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort))
-            .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)));
-
-        cluster = new MiniDFSCluster.Builder(conf)
-          .nnTopology(topology)
-          .numDataNodes(1)
-          .manageNameDfsSharedDirs(false)
-          .build();
-        cluster.waitActive();
-
-        setNNs();
-        fs = HATestUtil.configureFailoverFs(cluster, conf);
-
-        cluster.transitionToActive(0);
-        ++retryCount;
-        break;
-      } catch (BindException e) {
-        LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry "
-            + retryCount + " times");
-      }
-    }
-  }
-
-  @BeforeClass
-  public static void startBK() throws Exception {
-    journalCount = 0;
-    bkutil = new BKJMUtil(numBookies);
-    bkutil.start();
-  }
-
-  @AfterClass
-  public static void shutdownBK() throws Exception {
-    if (bkutil != null) {
-      bkutil.teardown();
-    }
-  }
-
-  @Override
-  public void testCheckpointCancellation() throws Exception {
-    // Overriden as the implementation in the superclass assumes that writes
-    // are to a file. This should be fixed at some point
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
deleted file mode 100644
index 07fcd72..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
+++ /dev/null
@@ -1,984 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.spy;
-import org.junit.Test;
-import org.junit.Before;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TestBookKeeperJournalManager {
-  static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
-  
-  private static final long DEFAULT_SEGMENT_SIZE = 1000;
-
-  protected static Configuration conf = new Configuration();
-  private ZooKeeper zkc;
-  private static BKJMUtil bkutil;
-  static int numBookies = 3;
-  private BookieServer newBookie;
-
-  @BeforeClass
-  public static void setupBookkeeper() throws Exception {
-    bkutil = new BKJMUtil(numBookies);
-    bkutil.start();
-  }
-
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    bkutil.teardown();
-  }
-
-  @Before
-  public void setup() throws Exception {
-    zkc = BKJMUtil.connectZooKeeper();
-  }
-
-  @After
-  public void teardown() throws Exception {
-    zkc.close();
-    if (newBookie != null) {
-      newBookie.shutdown();
-    }
-  }
-
-  private NamespaceInfo newNSInfo() {
-    Random r = new Random();
-    return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
-  }
-
-  @Test
-  public void testSimpleWrite() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long i = 1 ; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, 100);
- 
-    String zkpath = bkjm.finalizedLedgerZNode(1, 100);
-    
-    assertNotNull(zkc.exists(zkpath, false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
-  }
-
-  @Test
-  public void testNumberOfTransactions() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long i = 1 ; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, 100);
-
-    long numTrans = bkjm.getNumberOfTransactions(1, true);
-    assertEquals(100, numTrans);
-  }
-
-  @Test 
-  public void testNumberOfTransactionsWithGaps() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
-    bkjm.format(nsi);
-
-    long txid = 1;
-    for (long i = 0; i < 3; i++) {
-      long start = txid;
-      EditLogOutputStream out = bkjm.startLogSegment(start,
-          NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
-        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-        op.setTransactionId(txid++);
-        out.write(op);
-      }
-      out.close();
-      bkjm.finalizeLogSegment(start, txid-1);
-      assertNotNull(
-          zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
-    }
-    zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
-                                         DEFAULT_SEGMENT_SIZE*2), -1);
-    
-    long numTrans = bkjm.getNumberOfTransactions(1, true);
-    assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
-    
-    try {
-      numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true);
-      fail("Should have thrown corruption exception by this point");
-    } catch (JournalManager.CorruptionException ce) {
-      // if we get here, everything is going good
-    }
-
-    numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true);
-    assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
-  }
-
-  @Test
-  public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
-    bkjm.format(nsi);
-
-    long txid = 1;
-    for (long i = 0; i < 3; i++) {
-      long start = txid;
-      EditLogOutputStream out = bkjm.startLogSegment(start,
-          NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
-        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-        op.setTransactionId(txid++);
-        out.write(op);
-      }
-      
-      out.close();
-      bkjm.finalizeLogSegment(start, (txid-1));
-      assertNotNull(
-          zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
-    }
-    long start = txid;
-    EditLogOutputStream out = bkjm.startLogSegment(start,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(txid++);
-      out.write(op);
-    }
-    out.setReadyToFlush();
-    out.flush();
-    out.abort();
-    out.close();
-    
-    long numTrans = bkjm.getNumberOfTransactions(1, true);
-    assertEquals((txid-1), numTrans);
-  }
-
-  /**
-   * Create a bkjm namespace, write a journal from txid 1, close stream.
-   * Try to create a new journal from txid 1. Should throw an exception.
-   */
-  @Test
-  public void testWriteRestartFrom1() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
-    bkjm.format(nsi);
-
-    long txid = 1;
-    long start = txid;
-    EditLogOutputStream out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(txid++);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(start, (txid-1));
-    
-    txid = 1;
-    try {
-      out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      fail("Shouldn't be able to start another journal from " + txid
-          + " when one already exists");
-    } catch (Exception ioe) {
-      LOG.info("Caught exception as expected", ioe);
-    }
-
-    // test border case
-    txid = DEFAULT_SEGMENT_SIZE;
-    try {
-      out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      fail("Shouldn't be able to start another journal from " + txid
-          + " when one already exists");
-    } catch (IOException ioe) {
-      LOG.info("Caught exception as expected", ioe);
-    }
-
-    // open journal continuing from before
-    txid = DEFAULT_SEGMENT_SIZE + 1;
-    start = txid;
-    out = bkjm.startLogSegment(start,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    assertNotNull(out);
-
-    for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(txid++);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(start, (txid-1));
-
-    // open journal arbitarily far in the future
-    txid = DEFAULT_SEGMENT_SIZE * 4;
-    out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    assertNotNull(out);
-  }
-
-  @Test
-  public void testTwoWriters() throws Exception {
-    long start = 1;
-    NamespaceInfo nsi = newNSInfo();
-
-    BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
-    bkjm1.format(nsi);
-
-    BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
-
-
-    EditLogOutputStream out1 = bkjm1.startLogSegment(start,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    try {
-      bkjm2.startLogSegment(start,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      fail("Shouldn't have been able to open the second writer");
-    } catch (IOException ioe) {
-      LOG.info("Caught exception as expected", ioe);
-    }finally{
-      out1.close();
-    }
-  }
-
-  @Test
-  public void testSimpleRead() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
-        nsi);
-    bkjm.format(nsi);
-
-    final long numTransactions = 10000;
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
-    for (long i = 1 ; i <= numTransactions; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, numTransactions);
-
-    List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
-    bkjm.selectInputStreams(in, 1, true);
-    try {
-      assertEquals(numTransactions, 
-                   FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
-    } finally {
-      in.get(0).close();
-    }
-  }
-
-  @Test
-  public void testSimpleRecovery() throws Exception {
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
-        nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
-    for (long i = 1 ; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.setReadyToFlush();
-    out.flush();
-
-    out.abort();
-    out.close();
-
-
-    assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
-    assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
-
-    bkjm.recoverUnfinalizedSegments();
-
-    assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
-  }
-
-  /**
-   * Test that if enough bookies fail to prevent an ensemble,
-   * writes the bookkeeper will fail. Test that when once again
-   * an ensemble is available, it can continue to write.
-   */
-  @Test
-  public void testAllBookieFailure() throws Exception {
-    // bookie to fail
-    newBookie = bkutil.newBookie();
-    BookieServer replacementBookie = null;
-
-    try {
-      int ensembleSize = numBookies + 1;
-      assertEquals("New bookie didn't start",
-                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-      // ensure that the journal manager has to use all bookies,
-      // so that a failure will fail the journal manager
-      Configuration conf = new Configuration();
-      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-                  ensembleSize);
-      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
-                  ensembleSize);
-      long txid = 1;
-      NamespaceInfo nsi = newNSInfo();
-      BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-          BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
-          nsi);
-      bkjm.format(nsi);
-      EditLogOutputStream out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-
-      for (long i = 1 ; i <= 3; i++) {
-        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-        op.setTransactionId(txid++);
-        out.write(op);
-      }
-      out.setReadyToFlush();
-      out.flush();
-      newBookie.shutdown();
-      assertEquals("New bookie didn't die",
-                   numBookies, bkutil.checkBookiesUp(numBookies, 10));
-
-      try {
-        for (long i = 1 ; i <= 3; i++) {
-          FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-          op.setTransactionId(txid++);
-          out.write(op);
-        }
-        out.setReadyToFlush();
-        out.flush();
-        fail("should not get to this stage");
-      } catch (IOException ioe) {
-        LOG.debug("Error writing to bookkeeper", ioe);
-        assertTrue("Invalid exception message",
-                   ioe.getMessage().contains("Failed to write to bookkeeper"));
-      }
-      replacementBookie = bkutil.newBookie();
-
-      assertEquals("New bookie didn't start",
-                   numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
-      bkjm.recoverUnfinalizedSegments();
-      out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      for (long i = 1 ; i <= 3; i++) {
-        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-        op.setTransactionId(txid++);
-        out.write(op);
-      }
-
-      out.setReadyToFlush();
-      out.flush();
-
-    } catch (Exception e) {
-      LOG.error("Exception in test", e);
-      throw e;
-    } finally {
-      if (replacementBookie != null) {
-        replacementBookie.shutdown();
-      }
-      newBookie.shutdown();
-
-      if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
-        LOG.warn("Not all bookies from this test shut down, expect errors");
-      }
-    }
-  }
-
-  /**
-   * Test that a BookKeeper JM can continue to work across the
-   * failure of a bookie. This should be handled transparently
-   * by bookkeeper.
-   */
-  @Test
-  public void testOneBookieFailure() throws Exception {
-    newBookie = bkutil.newBookie();
-    BookieServer replacementBookie = null;
-
-    try {
-      int ensembleSize = numBookies + 1;
-      assertEquals("New bookie didn't start",
-                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-      // ensure that the journal manager has to use all bookies,
-      // so that a failure will fail the journal manager
-      Configuration conf = new Configuration();
-      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-                  ensembleSize);
-      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
-                  ensembleSize);
-      long txid = 1;
-
-      NamespaceInfo nsi = newNSInfo();
-      BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-          BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
-          nsi);
-      bkjm.format(nsi);
-
-      EditLogOutputStream out = bkjm.startLogSegment(txid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      for (long i = 1 ; i <= 3; i++) {
-        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-        op.setTransactionId(txid++);
-        out.write(op);
-      }
-      out.setReadyToFlush();
-      out.flush();
-
-      replacementBookie = bkutil.newBookie();
-      assertEquals("replacement bookie didn't start",
-                   ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
-      newBookie.shutdown();
-      assertEquals("New bookie didn't die",
-                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-      for (long i = 1 ; i <= 3; i++) {
-        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-        op.setTransactionId(txid++);
-        out.write(op);
-      }
-      out.setReadyToFlush();
-      out.flush();
-    } catch (Exception e) {
-      LOG.error("Exception in test", e);
-      throw e;
-    } finally {
-      if (replacementBookie != null) {
-        replacementBookie.shutdown();
-      }
-      newBookie.shutdown();
-
-      if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
-        LOG.warn("Not all bookies from this test shut down, expect errors");
-      }
-    }
-  }
-  
-  /**
-   * If a journal manager has an empty inprogress node, ensure that we throw an
-   * error, as this should not be possible, and some third party has corrupted
-   * the zookeeper state
-   */
-  @Test
-  public void testEmptyInprogressNode() throws Exception {
-    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
-                                                                 nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
-    for (long i = 1; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, 100);
-
-    out = bkjm.startLogSegment(101,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    out.close();
-    bkjm.close();
-    String inprogressZNode = bkjm.inprogressZNode(101);
-    zkc.setData(inprogressZNode, new byte[0], -1);
-
-    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
-    try {
-      bkjm.recoverUnfinalizedSegments();
-      fail("Should have failed. There should be no way of creating"
-          + " an empty inprogess znode");
-    } catch (IOException e) {
-      // correct behaviour
-      assertTrue("Exception different than expected", e.getMessage().contains(
-          "Invalid/Incomplete data in znode"));
-    } finally {
-      bkjm.close();
-    }
-  }
-
-  /**
-   * If a journal manager has an corrupt inprogress node, ensure that we throw
-   * an error, as this should not be possible, and some third party has
-   * corrupted the zookeeper state
-   */
-  @Test
-  public void testCorruptInprogressNode() throws Exception {
-    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
-                                                                 nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
-    for (long i = 1; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, 100);
-
-    out = bkjm.startLogSegment(101,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    out.close();
-    bkjm.close();
-
-    String inprogressZNode = bkjm.inprogressZNode(101);
-    zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
-
-    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
-    try {
-      bkjm.recoverUnfinalizedSegments();
-      fail("Should have failed. There should be no way of creating"
-          + " an empty inprogess znode");
-    } catch (IOException e) {
-      // correct behaviour
-      assertTrue("Exception different than expected", e.getMessage().contains(
-          "has no field named"));
-    } finally {
-      bkjm.close();
-    }
-  }
-
-  /**
-   * Cases can occur where we create a segment but crash before we even have the
-   * chance to write the START_SEGMENT op. If this occurs we should warn, but
-   * load as normal
-   */
-  @Test
-  public void testEmptyInprogressLedger() throws Exception {
-    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
-                                                                 nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
-    for (long i = 1; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, 100);
-
-    out = bkjm.startLogSegment(101,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    out.close();
-    bkjm.close();
-
-    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
-    bkjm.recoverUnfinalizedSegments();
-    out = bkjm.startLogSegment(101,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long i = 1; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(101, 200);
-
-    bkjm.close();
-  }
-
-  /**
-   * Test that if we fail between finalizing an inprogress and deleting the
-   * corresponding inprogress znode.
-   */
-  @Test
-  public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
-    URI uri = BKJMUtil
-        .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
-                                                                 nsi);
-    bkjm.format(nsi);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
-    for (long i = 1; i <= 100; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.close();
-
-    String inprogressZNode = bkjm.inprogressZNode(1);
-    String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
-    assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
-        null));
-    assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
-
-    byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
-
-    // finalize
-    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
-    bkjm.recoverUnfinalizedSegments();
-    bkjm.close();
-
-    assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
-    assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
-        null));
-
-    zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
-
-    // should work fine
-    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
-    bkjm.recoverUnfinalizedSegments();
-    bkjm.close();
-  }
-
-  /**
-   * Tests that the edit log file meta data reading from ZooKeeper should be
-   * able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
-   * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
-   */
-  @Test
-  public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
-    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
-                                                                 nsi);
-    bkjm.format(nsi);
-
-    try {
-      // start new inprogress log segment with txid=1
-      // and write transactions till txid=50
-      String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
-
-      // start new inprogress log segment with txid=51
-      // and write transactions till txid=100
-      String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
-
-      // read the metadata from ZK. Here simulating the situation
-      // when reading,the edit log metadata can be removed by purger thread.
-      ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
-      bkjm.setZooKeeper(zkspy);
-      Mockito.doThrow(
-          new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
-          .when(zkspy).getData(zkpath2, false, null);
-
-      List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
-      assertEquals("List contains the metadata of non exists path.", 1,
-          ledgerList.size());
-      assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
-          ledgerList.get(0).getZkPath());
-    } finally {
-      bkjm.close();
-    }
-  }
-
-  private enum ThreadStatus {
-    COMPLETED, GOODEXCEPTION, BADEXCEPTION;
-  };
-
-  /**
-   * Tests that concurrent calls to format will still allow one to succeed.
-   */
-  @Test
-  public void testConcurrentFormat() throws Exception {
-    final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat");
-    final NamespaceInfo nsi = newNSInfo();
-
-    // populate with data first
-    BookKeeperJournalManager bkjm
-      = new BookKeeperJournalManager(conf, uri, nsi);
-    bkjm.format(nsi);
-    for (int i = 1; i < 100*2; i += 2) {
-      bkjm.startLogSegment(i, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-      bkjm.finalizeLogSegment(i, i+1);
-    }
-    bkjm.close();
-
-    final int numThreads = 40;
-    List<Callable<ThreadStatus>> threads
-      = new ArrayList<Callable<ThreadStatus>>();
-    final CyclicBarrier barrier = new CyclicBarrier(numThreads);
-
-    for (int i = 0; i < numThreads; i++) {
-      threads.add(new Callable<ThreadStatus>() {
-          public ThreadStatus call() {
-            BookKeeperJournalManager bkjm = null;
-            try {
-              bkjm = new BookKeeperJournalManager(conf, uri, nsi);
-              barrier.await();
-              bkjm.format(nsi);
-              return ThreadStatus.COMPLETED;
-            } catch (IOException ioe) {
-              LOG.info("Exception formatting ", ioe);
-              return ThreadStatus.GOODEXCEPTION;
-            } catch (InterruptedException ie) {
-              LOG.error("Interrupted. Something is broken", ie);
-              Thread.currentThread().interrupt();
-              return ThreadStatus.BADEXCEPTION;
-            } catch (Exception e) {
-              LOG.error("Some other bad exception", e);
-              return ThreadStatus.BADEXCEPTION;
-            } finally {
-              if (bkjm != null) {
-                try {
-                  bkjm.close();
-                } catch (IOException ioe) {
-                  LOG.error("Error closing journal manager", ioe);
-                }
-              }
-            }
-          }
-        });
-    }
-    ExecutorService service = Executors.newFixedThreadPool(numThreads);
-    List<Future<ThreadStatus>> statuses = service.invokeAll(threads, 60,
-                                                      TimeUnit.SECONDS);
-    int numCompleted = 0;
-    for (Future<ThreadStatus> s : statuses) {
-      assertTrue(s.isDone());
-      assertTrue("Thread threw invalid exception",
-          s.get() == ThreadStatus.COMPLETED
-          || s.get() == ThreadStatus.GOODEXCEPTION);
-      if (s.get() == ThreadStatus.COMPLETED) {
-        numCompleted++;
-      }
-    }
-    LOG.info("Completed " + numCompleted + " formats");
-    assertTrue("No thread managed to complete formatting", numCompleted > 0);
-  }
-
-  @Test(timeout = 120000)
-  public void testDefaultAckQuorum() throws Exception {
-    newBookie = bkutil.newBookie();
-    int ensembleSize = numBookies + 1;
-    int quorumSize = numBookies + 1;
-    // ensure that the journal manager has to use all bookies,
-    // so that a failure will fail the journal manager
-    Configuration conf = new Configuration();
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-        ensembleSize);
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
-        quorumSize);
-    // sets 2 secs
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
-        2);
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
-    bkjm.format(nsi);
-    CountDownLatch sleepLatch = new CountDownLatch(1);
-    sleepBookie(sleepLatch, newBookie);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    int numTransactions = 100;
-    for (long i = 1; i <= numTransactions; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    try {
-      out.close();
-      bkjm.finalizeLogSegment(1, numTransactions);
-
-      List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
-      bkjm.selectInputStreams(in, 1, true);
-      try {
-        assertEquals(numTransactions,
-            FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
-      } finally {
-        in.get(0).close();
-      }
-      fail("Should throw exception as not enough non-faulty bookies available!");
-    } catch (IOException ioe) {
-      // expected
-    }
-  }
-
-  /**
-   * Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie
-   * alive and sleep all the other bookies. Now the client would wait for the
-   * acknowledgement from the ack size bookies and after receiving the success
-   * response will continue writing. Non ack client will hang long time to add
-   * entries.
-   */
-  @Test(timeout = 120000)
-  public void testAckQuorum() throws Exception {
-    // slow bookie
-    newBookie = bkutil.newBookie();
-    // make quorum size and ensemble size same to avoid the interleave writing
-    // of the ledger entries
-    int ensembleSize = numBookies + 1;
-    int quorumSize = numBookies + 1;
-    int ackSize = numBookies;
-    // ensure that the journal manager has to use all bookies,
-    // so that a failure will fail the journal manager
-    Configuration conf = new Configuration();
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-        ensembleSize);
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
-        quorumSize);
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE,
-        ackSize);
-    // sets 60 minutes
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
-        3600);
-
-    NamespaceInfo nsi = newNSInfo();
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
-    bkjm.format(nsi);
-    CountDownLatch sleepLatch = new CountDownLatch(1);
-    sleepBookie(sleepLatch, newBookie);
-
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    int numTransactions = 100;
-    for (long i = 1; i <= numTransactions; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, numTransactions);
-
-    List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
-    bkjm.selectInputStreams(in, 1, true);
-    try {
-      assertEquals(numTransactions,
-          FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
-    } finally {
-      sleepLatch.countDown();
-      in.get(0).close();
-      bkjm.close();
-    }
-  }
-
-  /**
-   * Sleep a bookie until I count down the latch
-   *
-   * @param latch
-   *          Latch to wait on
-   * @param bookie
-   *          bookie server
-   * @throws Exception
-   */
-  private void sleepBookie(final CountDownLatch l, final BookieServer bookie)
-      throws Exception {
-
-    Thread sleeper = new Thread() {
-      public void run() {
-        try {
-          bookie.suspendProcessing();
-          l.await(60, TimeUnit.SECONDS);
-          bookie.resumeProcessing();
-        } catch (Exception e) {
-          LOG.error("Error suspending bookie", e);
-        }
-      }
-    };
-    sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
-    sleeper.start();
-  }
-
-
-  private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
-      int startTxid, int endTxid) throws IOException, KeeperException,
-      InterruptedException {
-    EditLogOutputStream out = bkjm.startLogSegment(startTxid,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long i = startTxid; i <= endTxid; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    // finalize the inprogress_1 log segment.
-    bkjm.finalizeLogSegment(startTxid, endTxid);
-    String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
-    assertNotNull(zkc.exists(zkpath1, false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
-    return zkpath1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
deleted file mode 100644
index f5b86bc..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestBookKeeperSpeculativeRead {
-  private static final Log LOG = LogFactory
-      .getLog(TestBookKeeperSpeculativeRead.class);
-
-  private ZooKeeper zkc;
-  private static BKJMUtil bkutil;
-  private static int numLocalBookies = 1;
-  private static List<BookieServer> bks = new ArrayList<BookieServer>();
-
-  @BeforeClass
-  public static void setupBookkeeper() throws Exception {
-    bkutil = new BKJMUtil(1);
-    bkutil.start();
-  }
-
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    bkutil.teardown();
-    for (BookieServer bk : bks) {
-      bk.shutdown();
-    }
-  }
-
-  @Before
-  public void setup() throws Exception {
-    zkc = BKJMUtil.connectZooKeeper();
-  }
-
-  @After
-  public void teardown() throws Exception {
-    zkc.close();
-  }
-
-  private NamespaceInfo newNSInfo() {
-    Random r = new Random();
-    return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
-  }
-
-  /**
-   * Test speculative read feature supported by bookkeeper. Keep one bookie
-   * alive and sleep all the other bookies. Non spec client will hang for long
-   * time to read the entries from the bookkeeper.
-   */
-  @Test(timeout = 120000)
-  public void testSpeculativeRead() throws Exception {
-    // starting 9 more servers
-    for (int i = 1; i < 10; i++) {
-      bks.add(bkutil.newBookie());
-    }
-    NamespaceInfo nsi = newNSInfo();
-    Configuration conf = new Configuration();
-    int ensembleSize = numLocalBookies + 9;
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-        ensembleSize);
-    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
-        ensembleSize);
-    conf.setInt(
-        BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
-        100);
-    // sets 60 minute
-    conf.setInt(
-        BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi);
-    bkjm.format(nsi);
-
-    final long numTransactions = 1000;
-    EditLogOutputStream out = bkjm.startLogSegment(1,
-        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-    for (long i = 1; i <= numTransactions; i++) {
-      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
-      op.setTransactionId(i);
-      out.write(op);
-    }
-    out.close();
-    bkjm.finalizeLogSegment(1, numTransactions);
-
-    List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
-    bkjm.selectInputStreams(in, 1, true);
-
-    // sleep 9 bk servers. Now only one server is running and responding to the
-    // clients
-    CountDownLatch sleepLatch = new CountDownLatch(1);
-    for (final BookieServer bookie : bks) {
-      sleepBookie(sleepLatch, bookie);
-    }
-    try {
-      assertEquals(numTransactions,
-          FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
-    } finally {
-      in.get(0).close();
-      sleepLatch.countDown();
-      bkjm.close();
-    }
-  }
-
-  /**
-   * Sleep a bookie until I count down the latch
-   *
-   * @param latch
-   *          latch to wait on
-   * @param bookie
-   *          bookie server
-   * @throws Exception
-   */
-  private void sleepBookie(final CountDownLatch latch, final BookieServer bookie)
-      throws Exception {
-
-    Thread sleeper = new Thread() {
-      public void run() {
-        try {
-          bookie.suspendProcessing();
-          latch.await(2, TimeUnit.MINUTES);
-          bookie.resumeProcessing();
-        } catch (Exception e) {
-          LOG.error("Error suspending bookie", e);
-        }
-      }
-    };
-    sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
-    sleeper.start();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java
deleted file mode 100644
index ef7f708..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.File;
-import java.io.FileFilter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints.SlowCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class TestBootstrapStandbyWithBKJM {
-  private static BKJMUtil bkutil;
-  protected MiniDFSCluster cluster;
-
-  @BeforeClass
-  public static void setupBookkeeper() throws Exception {
-    bkutil = new BKJMUtil(3);
-    bkutil.start();
-  }
-
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    bkutil.teardown();
-  }
-
-  @After
-  public void teardown() {
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
-    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
-        .createJournalURI("/bootstrapStandby").toString());
-    BKJMUtil.addJournalManagerDefinition(conf);
-    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
-    conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
-        SlowCodec.class.getCanonicalName());
-    CompressionCodecFactory.setCodecClasses(conf,
-        ImmutableList.<Class> of(SlowCodec.class));
-    MiniDFSNNTopology topology = new MiniDFSNNTopology()
-        .addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(
-            new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN(
-            new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
-    cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
-        .numDataNodes(1).manageNameDfsSharedDirs(false).build();
-    cluster.waitActive();
-  }
-
-  /**
-   * While boostrapping, in_progress transaction entries should be skipped.
-   * Bootstrap usage for BKJM : "-force", "-nonInteractive", "-skipSharedEditsCheck"
-   */
-  @Test
-  public void testBootstrapStandbyWithActiveNN() throws Exception {
-    // make nn0 active
-    cluster.transitionToActive(0);
-   
-    // do ops and generate in-progress edit log data
-    Configuration confNN1 = cluster.getConfiguration(1);
-    DistributedFileSystem dfs = (DistributedFileSystem) HATestUtil
-        .configureFailoverFs(cluster, confNN1);
-    for (int i = 1; i <= 10; i++) {
-      dfs.mkdirs(new Path("/test" + i));
-    }
-    dfs.close();
-
-    // shutdown nn1 and delete its edit log files
-    cluster.shutdownNameNode(1);
-    deleteEditLogIfExists(confNN1);
-    cluster.getNameNodeRpc(0).setSafeMode(SafeModeAction.SAFEMODE_ENTER, true);
-    cluster.getNameNodeRpc(0).saveNamespace(0, 0);
-    cluster.getNameNodeRpc(0).setSafeMode(SafeModeAction.SAFEMODE_LEAVE, true);
-
-    // check without -skipSharedEditsCheck, Bootstrap should fail for BKJM
-    // immediately after saveNamespace
-    int rc = BootstrapStandby.run(new String[] { "-force", "-nonInteractive" },
-      confNN1);
-    Assert.assertEquals("Mismatches return code", 6, rc);
-
-    // check with -skipSharedEditsCheck
-    rc = BootstrapStandby.run(new String[] { "-force", "-nonInteractive",
-        "-skipSharedEditsCheck" }, confNN1);
-    Assert.assertEquals("Mismatches return code", 0, rc);
-
-    // Checkpoint as fast as we can, in a tight loop.
-    confNN1.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
-    cluster.restartNameNode(1);
-    cluster.transitionToStandby(1);
-   
-    NameNode nn0 = cluster.getNameNode(0);
-    HATestUtil.waitForStandbyToCatchUp(nn0, cluster.getNameNode(1));
-    long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
-        .getFSImage().getMostRecentCheckpointTxId();
-    HATestUtil.waitForCheckpoint(cluster, 1,
-        ImmutableList.of((int) expectedCheckpointTxId));
-
-    // Should have copied over the namespace
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of((int) expectedCheckpointTxId));
-    FSImageTestUtil.assertNNFilesMatch(cluster);
-  }
-
-  private void deleteEditLogIfExists(Configuration confNN1) {
-    String editDirs = confNN1.get(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
-    String[] listEditDirs = StringUtils.split(editDirs, ',');
-    Assert.assertTrue("Wrong edit directory path!", listEditDirs.length > 0);
-
-    for (String dir : listEditDirs) {
-      File curDir = new File(dir, "current");
-      File[] listFiles = curDir.listFiles(new FileFilter() {
-        @Override
-        public boolean accept(File f) {
-          if (!f.getName().startsWith("edits")) {
-            return true;
-          }
-          return false;
-        }
-      });
-      if (listFiles != null && listFiles.length > 0) {
-        for (File file : listFiles) {
-          Assert.assertTrue("Failed to delete edit files!", file.delete());
-        }
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org