You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/10/06 13:59:04 UTC
[2/3] hadoop git commit: HDFS-10957. Retire BKJM from trunk
(Vinayakumar B)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
deleted file mode 100644
index b1fc3d7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.*;
-
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
-
-import java.io.IOException;
-import java.io.File;
-
-/**
- * Utility class for setting up bookkeeper ensembles
- * and bringing individual bookies up and down
- */
-class BKJMUtil {
- protected static final Log LOG = LogFactory.getLog(BKJMUtil.class);
-
- int nextPort = 6000; // next port for additionally created bookies
- private Thread bkthread = null;
- private final static String zkEnsemble = "127.0.0.1:2181";
- int numBookies;
-
- BKJMUtil(final int numBookies) throws Exception {
- this.numBookies = numBookies;
-
- bkthread = new Thread() {
- public void run() {
- try {
- String[] args = new String[1];
- args[0] = String.valueOf(numBookies);
- LOG.info("Starting bk");
- LocalBookKeeper.main(args);
- } catch (InterruptedException e) {
- // go away quietly
- } catch (Exception e) {
- LOG.error("Error starting local bk", e);
- }
- }
- };
- }
-
- void start() throws Exception {
- bkthread.start();
- if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
- throw new Exception("Error starting zookeeper/bookkeeper");
- }
- assertEquals("Not all bookies started",
- numBookies, checkBookiesUp(numBookies, 10));
- }
-
- void teardown() throws Exception {
- if (bkthread != null) {
- bkthread.interrupt();
- bkthread.join();
- }
- }
-
- static ZooKeeper connectZooKeeper()
- throws IOException, KeeperException, InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
-
- ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
- public void process(WatchedEvent event) {
- if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- if (!latch.await(3, TimeUnit.SECONDS)) {
- throw new IOException("Zookeeper took too long to connect");
- }
- return zkc;
- }
-
- static URI createJournalURI(String path) throws Exception {
- return URI.create("bookkeeper://" + zkEnsemble + path);
- }
-
- static void addJournalManagerDefinition(Configuration conf) {
- conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper",
- "org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager");
- }
-
- BookieServer newBookie() throws Exception {
- int port = nextPort++;
- ServerConfiguration bookieConf = new ServerConfiguration();
- bookieConf.setBookiePort(port);
- File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
- "test");
- tmpdir.delete();
- tmpdir.mkdir();
-
- bookieConf.setZkServers(zkEnsemble);
- bookieConf.setJournalDirName(tmpdir.getPath());
- bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
-
- BookieServer b = new BookieServer(bookieConf);
- b.start();
- for (int i = 0; i < 10 && !b.isRunning(); i++) {
- Thread.sleep(10000);
- }
- if (!b.isRunning()) {
- throw new IOException("Bookie would not start");
- }
- return b;
- }
-
- /**
- * Check that a number of bookies are available
- * @param count number of bookies required
- * @param timeout number of seconds to wait for bookies to start
- * @throws IOException if bookies are not started by the time the timeout hits
- */
- int checkBookiesUp(int count, int timeout) throws Exception {
- ZooKeeper zkc = connectZooKeeper();
- try {
- int mostRecentSize = 0;
- for (int i = 0; i < timeout; i++) {
- try {
- List<String> children = zkc.getChildren("/ledgers/available",
- false);
- mostRecentSize = children.size();
- // Skip 'readonly znode' which is used for keeping R-O bookie details
- if (children.contains("readonly")) {
- mostRecentSize = children.size() - 1;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found " + mostRecentSize + " bookies up, "
- + "waiting for " + count);
- if (LOG.isTraceEnabled()) {
- for (String child : children) {
- LOG.trace(" server: " + child);
- }
- }
- }
- if (mostRecentSize == count) {
- break;
- }
- } catch (KeeperException e) {
- // ignore
- }
- Thread.sleep(1000);
- }
- return mostRecentSize;
- } finally {
- zkc.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
deleted file mode 100644
index ff8c00d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
-import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import org.apache.hadoop.hdfs.HAUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-
-import org.apache.hadoop.ipc.RemoteException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ExitUtil.ExitException;
-
-import org.apache.bookkeeper.proto.BookieServer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * Integration test to ensure that the BookKeeper JournalManager
- * works for HDFS Namenode HA
- */
-@RunWith(Parameterized.class)
-public class TestBookKeeperAsHASharedDir {
- static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
-
- private static BKJMUtil bkutil;
- static int numBookies = 3;
-
- private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
-
- @Parameters
- public static Collection<Object[]> data() {
- Collection<Object[]> params = new ArrayList<Object[]>();
- params.add(new Object[]{ Boolean.FALSE });
- params.add(new Object[]{ Boolean.TRUE });
- return params;
- }
-
- private static boolean useAsyncEditLog;
- public TestBookKeeperAsHASharedDir(Boolean async) {
- useAsyncEditLog = async;
- }
-
- private static Configuration getConf() {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
- useAsyncEditLog);
- return conf;
- }
-
- @BeforeClass
- public static void setupBookkeeper() throws Exception {
- bkutil = new BKJMUtil(numBookies);
- bkutil.start();
- }
-
- @Before
- public void clearExitStatus() {
- ExitUtil.resetFirstExitException();
- }
-
- @AfterClass
- public static void teardownBookkeeper() throws Exception {
- bkutil.teardown();
- }
-
- /**
- * Test simple HA failover usecase with BK
- */
- @Test
- public void testFailoverWithBK() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = getConf();
- conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
- BKJMUtil.createJournalURI("/hotfailover").toString());
- BKJMUtil.addJournalManagerDefinition(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(0)
- .manageNameDfsSharedDirs(false)
- .build();
- NameNode nn1 = cluster.getNameNode(0);
- NameNode nn2 = cluster.getNameNode(1);
-
- cluster.waitActive();
- cluster.transitionToActive(0);
-
- Path p = new Path("/testBKJMfailover");
-
- FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
-
- fs.mkdirs(p);
- cluster.shutdownNameNode(0);
-
- cluster.transitionToActive(1);
-
- assertTrue(fs.exists(p));
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- /**
- * Test HA failover, where BK, as the shared storage, fails.
- * Once it becomes available again, a standby can come up.
- * Verify that any write happening after the BK fail is not
- * available on the standby.
- */
- @Test
- public void testFailoverWithFailingBKCluster() throws Exception {
- int ensembleSize = numBookies + 1;
- BookieServer newBookie = bkutil.newBookie();
- assertEquals("New bookie didn't start",
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
- BookieServer replacementBookie = null;
-
- MiniDFSCluster cluster = null;
-
- try {
- Configuration conf = getConf();
- conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
- BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- ensembleSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
- ensembleSize);
- BKJMUtil.addJournalManagerDefinition(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(0)
- .manageNameDfsSharedDirs(false)
- .checkExitOnShutdown(false)
- .build();
- NameNode nn1 = cluster.getNameNode(0);
- NameNode nn2 = cluster.getNameNode(1);
-
- cluster.waitActive();
- cluster.transitionToActive(0);
-
- Path p1 = new Path("/testBKJMFailingBKCluster1");
- Path p2 = new Path("/testBKJMFailingBKCluster2");
-
- FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
-
- fs.mkdirs(p1);
- newBookie.shutdown(); // will take down shared storage
- assertEquals("New bookie didn't stop",
- numBookies, bkutil.checkBookiesUp(numBookies, 10));
-
- try {
- fs.mkdirs(p2);
- fail("mkdirs should result in the NN exiting");
- } catch (RemoteException re) {
- assertTrue(re.getClassName().contains("ExitException"));
- }
- cluster.shutdownNameNode(0);
-
- try {
- cluster.transitionToActive(1);
- fail("Shouldn't have been able to transition with bookies down");
- } catch (ExitException ee) {
- assertTrue("Should shutdown due to required journal failure",
- ee.getMessage().contains(
- "starting log segment 3 failed for required journal"));
- }
-
- replacementBookie = bkutil.newBookie();
- assertEquals("Replacement bookie didn't start",
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
- cluster.transitionToActive(1); // should work fine now
-
- assertTrue(fs.exists(p1));
- assertFalse(fs.exists(p2));
- } finally {
- newBookie.shutdown();
- if (replacementBookie != null) {
- replacementBookie.shutdown();
- }
-
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- /**
- * Test that two namenodes can't continue as primary
- */
- @Test
- public void testMultiplePrimariesStarted() throws Exception {
- Path p1 = new Path("/testBKJMMultiplePrimary");
-
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = getConf();
- conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
- BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
- BKJMUtil.addJournalManagerDefinition(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(0)
- .manageNameDfsSharedDirs(false)
- .checkExitOnShutdown(false)
- .build();
- NameNode nn1 = cluster.getNameNode(0);
- NameNode nn2 = cluster.getNameNode(1);
- cluster.waitActive();
- cluster.transitionToActive(0);
-
- FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
- fs.mkdirs(p1);
- nn1.getRpcServer().rollEditLog();
- cluster.transitionToActive(1);
- fs = cluster.getFileSystem(0); // get the older active server.
-
- try {
- System.out.println("DMS: > *************");
- boolean foo = fs.delete(p1, true);
- System.out.println("DMS: < ************* "+foo);
- fail("Log update on older active should cause it to exit");
- } catch (RemoteException re) {
- assertTrue(re.getClassName().contains("ExitException"));
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- /**
- * Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy
- * the edits log segments to new bkjm shared edits.
- *
- * @throws Exception
- */
- @Test
- public void testInitializeBKSharedEdits() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = getConf();
- HAUtil.setAllowStandbyReads(conf, true);
-
- MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
- cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
- .numDataNodes(0).build();
- cluster.waitActive();
- // Shutdown and clear the current filebased shared dir.
- cluster.shutdownNameNodes();
- File shareddir = new File(cluster.getSharedEditsDir(0, 1));
- assertTrue("Initial Shared edits dir not fully deleted",
- FileUtil.fullyDelete(shareddir));
-
- // Check namenodes should not start without shared dir.
- assertCanNotStartNamenode(cluster, 0);
- assertCanNotStartNamenode(cluster, 1);
-
- // Configure bkjm as new shared edits dir in both namenodes
- Configuration nn1Conf = cluster.getConfiguration(0);
- Configuration nn2Conf = cluster.getConfiguration(1);
- nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
- .createJournalURI("/initializeSharedEdits").toString());
- nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
- .createJournalURI("/initializeSharedEdits").toString());
- BKJMUtil.addJournalManagerDefinition(nn1Conf);
- BKJMUtil.addJournalManagerDefinition(nn2Conf);
-
- // Initialize the BKJM shared edits.
- assertFalse(NameNode.initializeSharedEdits(nn1Conf));
-
- // NameNode should be able to start and should be in sync with BKJM as
- // shared dir
- assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize");
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) {
- try {
- cluster.restartNameNode(nnIndex, false);
- fail("Should not have been able to start NN" + (nnIndex)
- + " without shared dir");
- } catch (IOException ioe) {
- LOG.info("Got expected exception", ioe);
- GenericTestUtils.assertExceptionContains(
- "storage directory does not exist or is not accessible", ioe);
- }
- }
-
- private void assertCanStartHANameNodes(MiniDFSCluster cluster,
- Configuration conf, String path) throws ServiceFailedException,
- IOException, URISyntaxException, InterruptedException {
- // Now should be able to start both NNs. Pass "false" here so that we don't
- // try to waitActive on all NNs, since the second NN doesn't exist yet.
- cluster.restartNameNode(0, false);
- cluster.restartNameNode(1, true);
-
- // Make sure HA is working.
- cluster
- .getNameNode(0)
- .getRpcServer()
- .transitionToActive(
- new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
- FileSystem fs = null;
- try {
- Path newPath = new Path(path);
- fs = HATestUtil.configureFailoverFs(cluster, conf);
- assertTrue(fs.mkdirs(newPath));
- HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
- cluster.getNameNode(1));
- assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
- newPath.toString(), false).isDir());
- } finally {
- if (fs != null) {
- fs.close();
- }
- }
- }
-
- /**
- * NameNode should load the edits correctly if the applicable edits are
- * present in the BKJM.
- */
- @Test
- public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = getConf();
- conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
- .createJournalURI("/correctEditLogSelection").toString());
- BKJMUtil.addJournalManagerDefinition(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0)
- .manageNameDfsSharedDirs(false).build();
- NameNode nn1 = cluster.getNameNode(0);
- NameNode nn2 = cluster.getNameNode(1);
- cluster.waitActive();
- cluster.transitionToActive(0);
- nn1.getRpcServer().rollEditLog(); // Roll Edits from current Active.
- // Transition to standby current active gracefully.
- cluster.transitionToStandby(0);
- // Make the other Active and Roll edits multiple times
- cluster.transitionToActive(1);
- nn2.getRpcServer().rollEditLog();
- nn2.getRpcServer().rollEditLog();
- // Now One more failover. So NN1 should be able to failover successfully.
- cluster.transitionToStandby(1);
- cluster.transitionToActive(0);
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
deleted file mode 100644
index f3f6ce5..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.Random;
-
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-
-public class TestBookKeeperConfiguration {
- private static final Log LOG = LogFactory
- .getLog(TestBookKeeperConfiguration.class);
- private static final int ZK_SESSION_TIMEOUT = 5000;
- private static final String HOSTPORT = "127.0.0.1:2181";
- private static final int CONNECTION_TIMEOUT = 30000;
- private static NIOServerCnxnFactory serverFactory;
- private static ZooKeeperServer zks;
- private static ZooKeeper zkc;
- private static int ZooKeeperDefaultPort = 2181;
- private static File ZkTmpDir;
- private BookKeeperJournalManager bkjm;
- private static final String BK_ROOT_PATH = "/ledgers";
-
- private static ZooKeeper connectZooKeeper(String ensemble)
- throws IOException, KeeperException, InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
-
- ZooKeeper zkc = new ZooKeeper(HOSTPORT, ZK_SESSION_TIMEOUT, new Watcher() {
- public void process(WatchedEvent event) {
- if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- if (!latch.await(ZK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS)) {
- throw new IOException("Zookeeper took too long to connect");
- }
- return zkc;
- }
-
- private NamespaceInfo newNSInfo() {
- Random r = new Random();
- return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
- }
-
- @BeforeClass
- public static void setupZooKeeper() throws Exception {
- // create a ZooKeeper server(dataDir, dataLogDir, port)
- LOG.info("Starting ZK server");
- ZkTmpDir = File.createTempFile("zookeeper", "test");
- ZkTmpDir.delete();
- ZkTmpDir.mkdir();
-
- try {
- zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
- serverFactory = new NIOServerCnxnFactory();
- serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
- serverFactory.startup(zks);
- } catch (Exception e) {
- LOG.error("Exception while instantiating ZooKeeper", e);
- }
-
- boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
- LOG.debug("ZooKeeper server up: " + b);
- }
-
- @Before
- public void setup() throws Exception {
- zkc = connectZooKeeper(HOSTPORT);
- try {
- ZKUtil.deleteRecursive(zkc, BK_ROOT_PATH);
- } catch (KeeperException.NoNodeException e) {
- LOG.debug("Ignoring no node exception on cleanup", e);
- } catch (Exception e) {
- LOG.error("Exception when deleting bookie root path in zk", e);
- }
- }
-
- @After
- public void teardown() throws Exception {
- if (null != zkc) {
- zkc.close();
- }
- if (null != bkjm) {
- bkjm.close();
- }
- }
-
- @AfterClass
- public static void teardownZooKeeper() throws Exception {
- if (null != zkc) {
- zkc.close();
- }
- }
-
- /**
- * Verify the BKJM is creating the bookie available path configured in
- * 'dfs.namenode.bookkeeperjournal.zk.availablebookies'
- */
- @Test
- public void testWithConfiguringBKAvailablePath() throws Exception {
- // set Bookie available path in the configuration
- String bkAvailablePath
- = BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT;
- Configuration conf = new Configuration();
- conf.setStrings(BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH,
- bkAvailablePath);
- Assert.assertNull(bkAvailablePath + " already exists", zkc.exists(
- bkAvailablePath, false));
- NamespaceInfo nsi = newNSInfo();
- bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
- nsi);
- bkjm.format(nsi);
- Assert.assertNotNull("Bookie available path : " + bkAvailablePath
- + " doesn't exists", zkc.exists(bkAvailablePath, false));
- }
-
- /**
- * Verify the BKJM is creating the bookie available default path, when there
- * is no 'dfs.namenode.bookkeeperjournal.zk.availablebookies' configured
- */
- @Test
- public void testDefaultBKAvailablePath() throws Exception {
- Configuration conf = new Configuration();
- Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists(
- BK_ROOT_PATH, false));
- NamespaceInfo nsi = newNSInfo();
- bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
- nsi);
- bkjm.format(nsi);
- Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
- + " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
deleted file mode 100644
index 52e4568..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Unit test for the bkjm's streams
- */
-public class TestBookKeeperEditLogStreams {
- static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class);
-
- private static BKJMUtil bkutil;
- private final static int numBookies = 3;
-
- @BeforeClass
- public static void setupBookkeeper() throws Exception {
- bkutil = new BKJMUtil(numBookies);
- bkutil.start();
- }
-
- @AfterClass
- public static void teardownBookkeeper() throws Exception {
- bkutil.teardown();
- }
-
- /**
- * Test that bkjm will refuse open a stream on an empty
- * ledger.
- */
- @Test
- public void testEmptyInputStream() throws Exception {
- ZooKeeper zk = BKJMUtil.connectZooKeeper();
-
- BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk);
- try {
- LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar"
- .getBytes());
- lh.close();
-
- EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar",
- HdfsServerConstants.NAMENODE_LAYOUT_VERSION, lh.getId(), 0x1234);
- try {
- new BookKeeperEditLogInputStream(lh, metadata, -1);
- fail("Shouldn't get this far, should have thrown");
- } catch (IOException ioe) {
- assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
- }
-
- metadata = new EditLogLedgerMetadata("/foobar",
- HdfsServerConstants.NAMENODE_LAYOUT_VERSION, lh.getId(), 0x1234);
- try {
- new BookKeeperEditLogInputStream(lh, metadata, 0);
- fail("Shouldn't get this far, should have thrown");
- } catch (IOException ioe) {
- assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
- }
- } finally {
- bkc.close();
- zk.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
deleted file mode 100644
index b8fc30d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-import java.net.BindException;
-import java.util.Random;
-
-/**
- * Runs the same tests as TestStandbyCheckpoints, but
- * using a bookkeeper journal manager as the shared directory
- */
-public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
- //overwrite the nn count
- static{
- TestStandbyCheckpoints.NUM_NNS = 2;
- }
- private static BKJMUtil bkutil = null;
- static int numBookies = 3;
- static int journalCount = 0;
- private final Random random = new Random();
-
- private static final Log LOG = LogFactory.getLog(TestStandbyCheckpoints.class);
-
- @SuppressWarnings("rawtypes")
- @Override
- @Before
- public void setupCluster() throws Exception {
- Configuration conf = setupCommonConfig();
- conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
- BKJMUtil.createJournalURI("/checkpointing" + journalCount++)
- .toString());
- BKJMUtil.addJournalManagerDefinition(conf);
-
- int retryCount = 0;
- while (true) {
- try {
- int basePort = 10060 + random.nextInt(100) * 2;
- MiniDFSNNTopology topology = new MiniDFSNNTopology()
- .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
- .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort))
- .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)));
-
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(topology)
- .numDataNodes(1)
- .manageNameDfsSharedDirs(false)
- .build();
- cluster.waitActive();
-
- setNNs();
- fs = HATestUtil.configureFailoverFs(cluster, conf);
-
- cluster.transitionToActive(0);
- ++retryCount;
- break;
- } catch (BindException e) {
- LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry "
- + retryCount + " times");
- }
- }
- }
-
- @BeforeClass
- public static void startBK() throws Exception {
- journalCount = 0;
- bkutil = new BKJMUtil(numBookies);
- bkutil.start();
- }
-
- @AfterClass
- public static void shutdownBK() throws Exception {
- if (bkutil != null) {
- bkutil.teardown();
- }
- }
-
- @Override
- public void testCheckpointCancellation() throws Exception {
- // Overriden as the implementation in the superclass assumes that writes
- // are to a file. This should be fixed at some point
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
deleted file mode 100644
index 07fcd72..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
+++ /dev/null
@@ -1,984 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.spy;
-import org.junit.Test;
-import org.junit.Before;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TestBookKeeperJournalManager {
- static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
-
- private static final long DEFAULT_SEGMENT_SIZE = 1000;
-
- protected static Configuration conf = new Configuration();
- private ZooKeeper zkc;
- private static BKJMUtil bkutil;
- static int numBookies = 3;
- private BookieServer newBookie;
-
- @BeforeClass
- public static void setupBookkeeper() throws Exception {
- bkutil = new BKJMUtil(numBookies);
- bkutil.start();
- }
-
- @AfterClass
- public static void teardownBookkeeper() throws Exception {
- bkutil.teardown();
- }
-
- @Before
- public void setup() throws Exception {
- zkc = BKJMUtil.connectZooKeeper();
- }
-
- @After
- public void teardown() throws Exception {
- zkc.close();
- if (newBookie != null) {
- newBookie.shutdown();
- }
- }
-
- private NamespaceInfo newNSInfo() {
- Random r = new Random();
- return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
- }
-
- @Test
- public void testSimpleWrite() throws Exception {
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = 1 ; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, 100);
-
- String zkpath = bkjm.finalizedLedgerZNode(1, 100);
-
- assertNotNull(zkc.exists(zkpath, false));
- assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
- }
-
- @Test
- public void testNumberOfTransactions() throws Exception {
- NamespaceInfo nsi = newNSInfo();
-
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = 1 ; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, 100);
-
- long numTrans = bkjm.getNumberOfTransactions(1, true);
- assertEquals(100, numTrans);
- }
-
- @Test
- public void testNumberOfTransactionsWithGaps() throws Exception {
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
- bkjm.format(nsi);
-
- long txid = 1;
- for (long i = 0; i < 3; i++) {
- long start = txid;
- EditLogOutputStream out = bkjm.startLogSegment(start,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(start, txid-1);
- assertNotNull(
- zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
- }
- zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
- DEFAULT_SEGMENT_SIZE*2), -1);
-
- long numTrans = bkjm.getNumberOfTransactions(1, true);
- assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
-
- try {
- numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true);
- fail("Should have thrown corruption exception by this point");
- } catch (JournalManager.CorruptionException ce) {
- // if we get here, everything is going good
- }
-
- numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true);
- assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
- }
-
- @Test
- public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
- bkjm.format(nsi);
-
- long txid = 1;
- for (long i = 0; i < 3; i++) {
- long start = txid;
- EditLogOutputStream out = bkjm.startLogSegment(start,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
-
- out.close();
- bkjm.finalizeLogSegment(start, (txid-1));
- assertNotNull(
- zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
- }
- long start = txid;
- EditLogOutputStream out = bkjm.startLogSegment(start,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.setReadyToFlush();
- out.flush();
- out.abort();
- out.close();
-
- long numTrans = bkjm.getNumberOfTransactions(1, true);
- assertEquals((txid-1), numTrans);
- }
-
- /**
- * Create a bkjm namespace, write a journal from txid 1, close stream.
- * Try to create a new journal from txid 1. Should throw an exception.
- */
- @Test
- public void testWriteRestartFrom1() throws Exception {
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
- bkjm.format(nsi);
-
- long txid = 1;
- long start = txid;
- EditLogOutputStream out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(start, (txid-1));
-
- txid = 1;
- try {
- out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- fail("Shouldn't be able to start another journal from " + txid
- + " when one already exists");
- } catch (Exception ioe) {
- LOG.info("Caught exception as expected", ioe);
- }
-
- // test border case
- txid = DEFAULT_SEGMENT_SIZE;
- try {
- out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- fail("Shouldn't be able to start another journal from " + txid
- + " when one already exists");
- } catch (IOException ioe) {
- LOG.info("Caught exception as expected", ioe);
- }
-
- // open journal continuing from before
- txid = DEFAULT_SEGMENT_SIZE + 1;
- start = txid;
- out = bkjm.startLogSegment(start,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- assertNotNull(out);
-
- for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(start, (txid-1));
-
- // open journal arbitarily far in the future
- txid = DEFAULT_SEGMENT_SIZE * 4;
- out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- assertNotNull(out);
- }
-
- @Test
- public void testTwoWriters() throws Exception {
- long start = 1;
- NamespaceInfo nsi = newNSInfo();
-
- BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
- bkjm1.format(nsi);
-
- BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
-
-
- EditLogOutputStream out1 = bkjm1.startLogSegment(start,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- try {
- bkjm2.startLogSegment(start,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- fail("Shouldn't have been able to open the second writer");
- } catch (IOException ioe) {
- LOG.info("Caught exception as expected", ioe);
- }finally{
- out1.close();
- }
- }
-
- @Test
- public void testSimpleRead() throws Exception {
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
- nsi);
- bkjm.format(nsi);
-
- final long numTransactions = 10000;
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
- for (long i = 1 ; i <= numTransactions; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, numTransactions);
-
- List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
- bkjm.selectInputStreams(in, 1, true);
- try {
- assertEquals(numTransactions,
- FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
- } finally {
- in.get(0).close();
- }
- }
-
- @Test
- public void testSimpleRecovery() throws Exception {
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
- nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
- for (long i = 1 ; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.setReadyToFlush();
- out.flush();
-
- out.abort();
- out.close();
-
-
- assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
- assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
-
- bkjm.recoverUnfinalizedSegments();
-
- assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
- assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
- }
-
- /**
- * Test that if enough bookies fail to prevent an ensemble,
- * writes the bookkeeper will fail. Test that when once again
- * an ensemble is available, it can continue to write.
- */
- @Test
- public void testAllBookieFailure() throws Exception {
- // bookie to fail
- newBookie = bkutil.newBookie();
- BookieServer replacementBookie = null;
-
- try {
- int ensembleSize = numBookies + 1;
- assertEquals("New bookie didn't start",
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
- // ensure that the journal manager has to use all bookies,
- // so that a failure will fail the journal manager
- Configuration conf = new Configuration();
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- ensembleSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
- ensembleSize);
- long txid = 1;
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
- nsi);
- bkjm.format(nsi);
- EditLogOutputStream out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
-
- for (long i = 1 ; i <= 3; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.setReadyToFlush();
- out.flush();
- newBookie.shutdown();
- assertEquals("New bookie didn't die",
- numBookies, bkutil.checkBookiesUp(numBookies, 10));
-
- try {
- for (long i = 1 ; i <= 3; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.setReadyToFlush();
- out.flush();
- fail("should not get to this stage");
- } catch (IOException ioe) {
- LOG.debug("Error writing to bookkeeper", ioe);
- assertTrue("Invalid exception message",
- ioe.getMessage().contains("Failed to write to bookkeeper"));
- }
- replacementBookie = bkutil.newBookie();
-
- assertEquals("New bookie didn't start",
- numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
- bkjm.recoverUnfinalizedSegments();
- out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = 1 ; i <= 3; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
-
- out.setReadyToFlush();
- out.flush();
-
- } catch (Exception e) {
- LOG.error("Exception in test", e);
- throw e;
- } finally {
- if (replacementBookie != null) {
- replacementBookie.shutdown();
- }
- newBookie.shutdown();
-
- if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
- LOG.warn("Not all bookies from this test shut down, expect errors");
- }
- }
- }
-
- /**
- * Test that a BookKeeper JM can continue to work across the
- * failure of a bookie. This should be handled transparently
- * by bookkeeper.
- */
- @Test
- public void testOneBookieFailure() throws Exception {
- newBookie = bkutil.newBookie();
- BookieServer replacementBookie = null;
-
- try {
- int ensembleSize = numBookies + 1;
- assertEquals("New bookie didn't start",
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
- // ensure that the journal manager has to use all bookies,
- // so that a failure will fail the journal manager
- Configuration conf = new Configuration();
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- ensembleSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
- ensembleSize);
- long txid = 1;
-
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
- nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(txid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = 1 ; i <= 3; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.setReadyToFlush();
- out.flush();
-
- replacementBookie = bkutil.newBookie();
- assertEquals("replacement bookie didn't start",
- ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
- newBookie.shutdown();
- assertEquals("New bookie didn't die",
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
- for (long i = 1 ; i <= 3; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(txid++);
- out.write(op);
- }
- out.setReadyToFlush();
- out.flush();
- } catch (Exception e) {
- LOG.error("Exception in test", e);
- throw e;
- } finally {
- if (replacementBookie != null) {
- replacementBookie.shutdown();
- }
- newBookie.shutdown();
-
- if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
- LOG.warn("Not all bookies from this test shut down, expect errors");
- }
- }
- }
-
- /**
- * If a journal manager has an empty inprogress node, ensure that we throw an
- * error, as this should not be possible, and some third party has corrupted
- * the zookeeper state
- */
- @Test
- public void testEmptyInprogressNode() throws Exception {
- URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
- nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
- for (long i = 1; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, 100);
-
- out = bkjm.startLogSegment(101,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- out.close();
- bkjm.close();
- String inprogressZNode = bkjm.inprogressZNode(101);
- zkc.setData(inprogressZNode, new byte[0], -1);
-
- bkjm = new BookKeeperJournalManager(conf, uri, nsi);
- try {
- bkjm.recoverUnfinalizedSegments();
- fail("Should have failed. There should be no way of creating"
- + " an empty inprogess znode");
- } catch (IOException e) {
- // correct behaviour
- assertTrue("Exception different than expected", e.getMessage().contains(
- "Invalid/Incomplete data in znode"));
- } finally {
- bkjm.close();
- }
- }
-
- /**
- * If a journal manager has an corrupt inprogress node, ensure that we throw
- * an error, as this should not be possible, and some third party has
- * corrupted the zookeeper state
- */
- @Test
- public void testCorruptInprogressNode() throws Exception {
- URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
- nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
- for (long i = 1; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, 100);
-
- out = bkjm.startLogSegment(101,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- out.close();
- bkjm.close();
-
- String inprogressZNode = bkjm.inprogressZNode(101);
- zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
-
- bkjm = new BookKeeperJournalManager(conf, uri, nsi);
- try {
- bkjm.recoverUnfinalizedSegments();
- fail("Should have failed. There should be no way of creating"
- + " an empty inprogess znode");
- } catch (IOException e) {
- // correct behaviour
- assertTrue("Exception different than expected", e.getMessage().contains(
- "has no field named"));
- } finally {
- bkjm.close();
- }
- }
-
- /**
- * Cases can occur where we create a segment but crash before we even have the
- * chance to write the START_SEGMENT op. If this occurs we should warn, but
- * load as normal
- */
- @Test
- public void testEmptyInprogressLedger() throws Exception {
- URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
- nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
- for (long i = 1; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, 100);
-
- out = bkjm.startLogSegment(101,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- out.close();
- bkjm.close();
-
- bkjm = new BookKeeperJournalManager(conf, uri, nsi);
- bkjm.recoverUnfinalizedSegments();
- out = bkjm.startLogSegment(101,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = 1; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(101, 200);
-
- bkjm.close();
- }
-
- /**
- * Test that if we fail between finalizing an inprogress and deleting the
- * corresponding inprogress znode.
- */
- @Test
- public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
- URI uri = BKJMUtil
- .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
- nsi);
- bkjm.format(nsi);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
- for (long i = 1; i <= 100; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.close();
-
- String inprogressZNode = bkjm.inprogressZNode(1);
- String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
- assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
- null));
- assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
-
- byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
-
- // finalize
- bkjm = new BookKeeperJournalManager(conf, uri, nsi);
- bkjm.recoverUnfinalizedSegments();
- bkjm.close();
-
- assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
- assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
- null));
-
- zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- // should work fine
- bkjm = new BookKeeperJournalManager(conf, uri, nsi);
- bkjm.recoverUnfinalizedSegments();
- bkjm.close();
- }
-
- /**
- * Tests that the edit log file meta data reading from ZooKeeper should be
- * able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
- * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
- */
- @Test
- public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
- URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
- nsi);
- bkjm.format(nsi);
-
- try {
- // start new inprogress log segment with txid=1
- // and write transactions till txid=50
- String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
-
- // start new inprogress log segment with txid=51
- // and write transactions till txid=100
- String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
-
- // read the metadata from ZK. Here simulating the situation
- // when reading,the edit log metadata can be removed by purger thread.
- ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
- bkjm.setZooKeeper(zkspy);
- Mockito.doThrow(
- new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
- .when(zkspy).getData(zkpath2, false, null);
-
- List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
- assertEquals("List contains the metadata of non exists path.", 1,
- ledgerList.size());
- assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
- ledgerList.get(0).getZkPath());
- } finally {
- bkjm.close();
- }
- }
-
- private enum ThreadStatus {
- COMPLETED, GOODEXCEPTION, BADEXCEPTION;
- };
-
- /**
- * Tests that concurrent calls to format will still allow one to succeed.
- */
- @Test
- public void testConcurrentFormat() throws Exception {
- final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat");
- final NamespaceInfo nsi = newNSInfo();
-
- // populate with data first
- BookKeeperJournalManager bkjm
- = new BookKeeperJournalManager(conf, uri, nsi);
- bkjm.format(nsi);
- for (int i = 1; i < 100*2; i += 2) {
- bkjm.startLogSegment(i, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- bkjm.finalizeLogSegment(i, i+1);
- }
- bkjm.close();
-
- final int numThreads = 40;
- List<Callable<ThreadStatus>> threads
- = new ArrayList<Callable<ThreadStatus>>();
- final CyclicBarrier barrier = new CyclicBarrier(numThreads);
-
- for (int i = 0; i < numThreads; i++) {
- threads.add(new Callable<ThreadStatus>() {
- public ThreadStatus call() {
- BookKeeperJournalManager bkjm = null;
- try {
- bkjm = new BookKeeperJournalManager(conf, uri, nsi);
- barrier.await();
- bkjm.format(nsi);
- return ThreadStatus.COMPLETED;
- } catch (IOException ioe) {
- LOG.info("Exception formatting ", ioe);
- return ThreadStatus.GOODEXCEPTION;
- } catch (InterruptedException ie) {
- LOG.error("Interrupted. Something is broken", ie);
- Thread.currentThread().interrupt();
- return ThreadStatus.BADEXCEPTION;
- } catch (Exception e) {
- LOG.error("Some other bad exception", e);
- return ThreadStatus.BADEXCEPTION;
- } finally {
- if (bkjm != null) {
- try {
- bkjm.close();
- } catch (IOException ioe) {
- LOG.error("Error closing journal manager", ioe);
- }
- }
- }
- }
- });
- }
- ExecutorService service = Executors.newFixedThreadPool(numThreads);
- List<Future<ThreadStatus>> statuses = service.invokeAll(threads, 60,
- TimeUnit.SECONDS);
- int numCompleted = 0;
- for (Future<ThreadStatus> s : statuses) {
- assertTrue(s.isDone());
- assertTrue("Thread threw invalid exception",
- s.get() == ThreadStatus.COMPLETED
- || s.get() == ThreadStatus.GOODEXCEPTION);
- if (s.get() == ThreadStatus.COMPLETED) {
- numCompleted++;
- }
- }
- LOG.info("Completed " + numCompleted + " formats");
- assertTrue("No thread managed to complete formatting", numCompleted > 0);
- }
-
- @Test(timeout = 120000)
- public void testDefaultAckQuorum() throws Exception {
- newBookie = bkutil.newBookie();
- int ensembleSize = numBookies + 1;
- int quorumSize = numBookies + 1;
- // ensure that the journal manager has to use all bookies,
- // so that a failure will fail the journal manager
- Configuration conf = new Configuration();
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- ensembleSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
- quorumSize);
- // sets 2 secs
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
- 2);
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
- bkjm.format(nsi);
- CountDownLatch sleepLatch = new CountDownLatch(1);
- sleepBookie(sleepLatch, newBookie);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- int numTransactions = 100;
- for (long i = 1; i <= numTransactions; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- try {
- out.close();
- bkjm.finalizeLogSegment(1, numTransactions);
-
- List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
- bkjm.selectInputStreams(in, 1, true);
- try {
- assertEquals(numTransactions,
- FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
- } finally {
- in.get(0).close();
- }
- fail("Should throw exception as not enough non-faulty bookies available!");
- } catch (IOException ioe) {
- // expected
- }
- }
-
- /**
- * Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie
- * alive and sleep all the other bookies. Now the client would wait for the
- * acknowledgement from the ack size bookies and after receiving the success
- * response will continue writing. Non ack client will hang long time to add
- * entries.
- */
- @Test(timeout = 120000)
- public void testAckQuorum() throws Exception {
- // slow bookie
- newBookie = bkutil.newBookie();
- // make quorum size and ensemble size same to avoid the interleave writing
- // of the ledger entries
- int ensembleSize = numBookies + 1;
- int quorumSize = numBookies + 1;
- int ackSize = numBookies;
- // ensure that the journal manager has to use all bookies,
- // so that a failure will fail the journal manager
- Configuration conf = new Configuration();
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- ensembleSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
- quorumSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE,
- ackSize);
- // sets 60 minutes
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
- 3600);
-
- NamespaceInfo nsi = newNSInfo();
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
- bkjm.format(nsi);
- CountDownLatch sleepLatch = new CountDownLatch(1);
- sleepBookie(sleepLatch, newBookie);
-
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- int numTransactions = 100;
- for (long i = 1; i <= numTransactions; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, numTransactions);
-
- List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
- bkjm.selectInputStreams(in, 1, true);
- try {
- assertEquals(numTransactions,
- FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
- } finally {
- sleepLatch.countDown();
- in.get(0).close();
- bkjm.close();
- }
- }
-
- /**
- * Sleep a bookie until I count down the latch
- *
- * @param latch
- * Latch to wait on
- * @param bookie
- * bookie server
- * @throws Exception
- */
- private void sleepBookie(final CountDownLatch l, final BookieServer bookie)
- throws Exception {
-
- Thread sleeper = new Thread() {
- public void run() {
- try {
- bookie.suspendProcessing();
- l.await(60, TimeUnit.SECONDS);
- bookie.resumeProcessing();
- } catch (Exception e) {
- LOG.error("Error suspending bookie", e);
- }
- }
- };
- sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
- sleeper.start();
- }
-
-
- private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
- int startTxid, int endTxid) throws IOException, KeeperException,
- InterruptedException {
- EditLogOutputStream out = bkjm.startLogSegment(startTxid,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = startTxid; i <= endTxid; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- // finalize the inprogress_1 log segment.
- bkjm.finalizeLogSegment(startTxid, endTxid);
- String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
- assertNotNull(zkc.exists(zkpath1, false));
- assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
- return zkpath1;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
deleted file mode 100644
index f5b86bc..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestBookKeeperSpeculativeRead {
- private static final Log LOG = LogFactory
- .getLog(TestBookKeeperSpeculativeRead.class);
-
- private ZooKeeper zkc;
- private static BKJMUtil bkutil;
- private static int numLocalBookies = 1;
- private static List<BookieServer> bks = new ArrayList<BookieServer>();
-
- @BeforeClass
- public static void setupBookkeeper() throws Exception {
- bkutil = new BKJMUtil(1);
- bkutil.start();
- }
-
- @AfterClass
- public static void teardownBookkeeper() throws Exception {
- bkutil.teardown();
- for (BookieServer bk : bks) {
- bk.shutdown();
- }
- }
-
- @Before
- public void setup() throws Exception {
- zkc = BKJMUtil.connectZooKeeper();
- }
-
- @After
- public void teardown() throws Exception {
- zkc.close();
- }
-
- private NamespaceInfo newNSInfo() {
- Random r = new Random();
- return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
- }
-
- /**
- * Test speculative read feature supported by bookkeeper. Keep one bookie
- * alive and sleep all the other bookies. Non spec client will hang for long
- * time to read the entries from the bookkeeper.
- */
- @Test(timeout = 120000)
- public void testSpeculativeRead() throws Exception {
- // starting 9 more servers
- for (int i = 1; i < 10; i++) {
- bks.add(bkutil.newBookie());
- }
- NamespaceInfo nsi = newNSInfo();
- Configuration conf = new Configuration();
- int ensembleSize = numLocalBookies + 9;
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- ensembleSize);
- conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
- ensembleSize);
- conf.setInt(
- BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
- 100);
- // sets 60 minute
- conf.setInt(
- BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi);
- bkjm.format(nsi);
-
- final long numTransactions = 1000;
- EditLogOutputStream out = bkjm.startLogSegment(1,
- NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
- for (long i = 1; i <= numTransactions; i++) {
- FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
- op.setTransactionId(i);
- out.write(op);
- }
- out.close();
- bkjm.finalizeLogSegment(1, numTransactions);
-
- List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
- bkjm.selectInputStreams(in, 1, true);
-
- // sleep 9 bk servers. Now only one server is running and responding to the
- // clients
- CountDownLatch sleepLatch = new CountDownLatch(1);
- for (final BookieServer bookie : bks) {
- sleepBookie(sleepLatch, bookie);
- }
- try {
- assertEquals(numTransactions,
- FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
- } finally {
- in.get(0).close();
- sleepLatch.countDown();
- bkjm.close();
- }
- }
-
- /**
- * Sleep a bookie until I count down the latch
- *
- * @param latch
- * latch to wait on
- * @param bookie
- * bookie server
- * @throws Exception
- */
- private void sleepBookie(final CountDownLatch latch, final BookieServer bookie)
- throws Exception {
-
- Thread sleeper = new Thread() {
- public void run() {
- try {
- bookie.suspendProcessing();
- latch.await(2, TimeUnit.MINUTES);
- bookie.resumeProcessing();
- } catch (Exception e) {
- LOG.error("Error suspending bookie", e);
- }
- }
- };
- sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
- sleeper.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java
deleted file mode 100644
index ef7f708..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBootstrapStandbyWithBKJM.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.File;
-import java.io.FileFilter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints.SlowCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class TestBootstrapStandbyWithBKJM {
- private static BKJMUtil bkutil;
- protected MiniDFSCluster cluster;
-
- @BeforeClass
- public static void setupBookkeeper() throws Exception {
- bkutil = new BKJMUtil(3);
- bkutil.start();
- }
-
- @AfterClass
- public static void teardownBookkeeper() throws Exception {
- bkutil.teardown();
- }
-
- @After
- public void teardown() {
- if (cluster != null) {
- cluster.shutdown();
- cluster = null;
- }
- }
-
- @Before
- public void setUp() throws Exception {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
- conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
- .createJournalURI("/bootstrapStandby").toString());
- BKJMUtil.addJournalManagerDefinition(conf);
- conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
- conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
- SlowCodec.class.getCanonicalName());
- CompressionCodecFactory.setCodecClasses(conf,
- ImmutableList.<Class> of(SlowCodec.class));
- MiniDFSNNTopology topology = new MiniDFSNNTopology()
- .addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(
- new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN(
- new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
- cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
- .numDataNodes(1).manageNameDfsSharedDirs(false).build();
- cluster.waitActive();
- }
-
- /**
- * While boostrapping, in_progress transaction entries should be skipped.
- * Bootstrap usage for BKJM : "-force", "-nonInteractive", "-skipSharedEditsCheck"
- */
- @Test
- public void testBootstrapStandbyWithActiveNN() throws Exception {
- // make nn0 active
- cluster.transitionToActive(0);
-
- // do ops and generate in-progress edit log data
- Configuration confNN1 = cluster.getConfiguration(1);
- DistributedFileSystem dfs = (DistributedFileSystem) HATestUtil
- .configureFailoverFs(cluster, confNN1);
- for (int i = 1; i <= 10; i++) {
- dfs.mkdirs(new Path("/test" + i));
- }
- dfs.close();
-
- // shutdown nn1 and delete its edit log files
- cluster.shutdownNameNode(1);
- deleteEditLogIfExists(confNN1);
- cluster.getNameNodeRpc(0).setSafeMode(SafeModeAction.SAFEMODE_ENTER, true);
- cluster.getNameNodeRpc(0).saveNamespace(0, 0);
- cluster.getNameNodeRpc(0).setSafeMode(SafeModeAction.SAFEMODE_LEAVE, true);
-
- // check without -skipSharedEditsCheck, Bootstrap should fail for BKJM
- // immediately after saveNamespace
- int rc = BootstrapStandby.run(new String[] { "-force", "-nonInteractive" },
- confNN1);
- Assert.assertEquals("Mismatches return code", 6, rc);
-
- // check with -skipSharedEditsCheck
- rc = BootstrapStandby.run(new String[] { "-force", "-nonInteractive",
- "-skipSharedEditsCheck" }, confNN1);
- Assert.assertEquals("Mismatches return code", 0, rc);
-
- // Checkpoint as fast as we can, in a tight loop.
- confNN1.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
- cluster.restartNameNode(1);
- cluster.transitionToStandby(1);
-
- NameNode nn0 = cluster.getNameNode(0);
- HATestUtil.waitForStandbyToCatchUp(nn0, cluster.getNameNode(1));
- long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
- .getFSImage().getMostRecentCheckpointTxId();
- HATestUtil.waitForCheckpoint(cluster, 1,
- ImmutableList.of((int) expectedCheckpointTxId));
-
- // Should have copied over the namespace
- FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
- ImmutableList.of((int) expectedCheckpointTxId));
- FSImageTestUtil.assertNNFilesMatch(cluster);
- }
-
- private void deleteEditLogIfExists(Configuration confNN1) {
- String editDirs = confNN1.get(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
- String[] listEditDirs = StringUtils.split(editDirs, ',');
- Assert.assertTrue("Wrong edit directory path!", listEditDirs.length > 0);
-
- for (String dir : listEditDirs) {
- File curDir = new File(dir, "current");
- File[] listFiles = curDir.listFiles(new FileFilter() {
- @Override
- public boolean accept(File f) {
- if (!f.getName().startsWith("edits")) {
- return true;
- }
- return false;
- }
- });
- if (listFiles != null && listFiles.length > 0) {
- for (File file : listFiles) {
- Assert.assertTrue("Failed to delete edit files!", file.delete());
- }
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org