You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/04 18:31:09 UTC
svn commit: r1394145 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/bin/
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/
bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/
Author: ivank
Date: Thu Oct 4 16:31:09 2012
New Revision: 1394145
URL: http://svn.apache.org/viewvc?rev=1394145&view=rev
Log:
BOOKKEEPER-319: Manage auditing and replication processes (Vinay via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 4 16:31:09 2012
@@ -148,6 +148,8 @@ Trunk (unreleased changes)
BOOKKEEPER-278: Ability to disable auto recovery temporarily (rakeshr via ivank)
+ BOOKKEEPER-319: Manage auditing and replication processes (Vinay via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Thu Oct 4 16:31:09 2012
@@ -78,6 +78,7 @@ bookkeeper_help() {
Usage: bookkeeper <command>
where command is one of:
bookie Run a bookie server
+ autorecovery Run AutoRecovery service daemon
localbookie <n> Run a test ensemble of <n> bookies locally
upgrade Upgrade bookie filesystem
shell Run shell for admin commands
@@ -165,6 +166,8 @@ OPTS="$OPTS -Dbookkeeper.log.file=$BOOKI
cd "$BK_HOME"
if [ $COMMAND == "bookie" ]; then
exec java $OPTS $JMX_ARGS org.apache.bookkeeper.proto.BookieServer --conf $BOOKIE_CONF $@
+elif [ $COMMAND == "autorecovery" ]; then
+ exec java $OPTS $JMX_ARGS org.apache.bookkeeper.replication.AutoRecoveryMain --conf $BOOKIE_CONF $@
elif [ $COMMAND == "localbookie" ]; then
NUMBER=$1
shift
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh Thu Oct 4 16:31:09 2012
@@ -61,6 +61,9 @@ case $command in
(bookie)
echo "doing $startStop $command ..."
;;
+ (autorecovery)
+ echo "doing $startStop $command ..."
+ ;;
(*)
echo "Error: unknown service name $command"
usage
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java Thu Oct 4 16:31:09 2012
@@ -66,7 +66,7 @@ public class AuditorElector {
private final ZooKeeper zkc;
private String myVote;
- private Auditor auditor;
+ Auditor auditor;
private volatile boolean running = true;
/**
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1394145&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Thu Oct 4 16:31:09 2012
@@ -0,0 +1,267 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+import org.apache.bookkeeper.bookie.ExitCode;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to start/stop the AutoRecovery daemons Auditor and ReplicationWorker
+ */
+public class AutoRecoveryMain {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AutoRecoveryMain.class);
+
+ private ServerConfiguration conf;
+ private ZooKeeper zk;
+ AuditorElector auditorElector;
+ ReplicationWorker replicationWorker;
+ private AutoRecoveryDeathWatcher deathWatcher;
+ private int exitCode;
+ private volatile boolean shuttingDown = false;
+
+ public AutoRecoveryMain(ServerConfiguration conf) throws IOException,
+ InterruptedException, KeeperException, UnavailableException,
+ CompatibilityException {
+ this.conf = conf;
+ zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(),
+ conf.getZkTimeout());
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Check for expired connection.
+ if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
+ LOG.error("ZK client connection to the"
+ + " ZK server has expired!");
+ shutdown(ExitCode.ZK_EXPIRED);
+ }
+ }
+ };
+ zk.register(watcher);
+ auditorElector = new AuditorElector(
+ getMyBookieAddress(conf).toString(), conf, zk);
+ replicationWorker = new ReplicationWorker(zk, conf,
+ getMyBookieAddress(conf));
+ deathWatcher = new AutoRecoveryDeathWatcher(this);
+ }
+
+ private static InetSocketAddress getMyBookieAddress(ServerConfiguration conf)
+ throws UnknownHostException {
+ return new InetSocketAddress(InetAddress.getLocalHost()
+ .getHostAddress(), conf.getBookiePort());
+ }
+
+ /*
+ * Start daemons
+ */
+ public void start() throws UnavailableException {
+ auditorElector.doElection();
+ replicationWorker.start();
+ deathWatcher.start();
+ }
+
+ /*
+ * Waits till all daemons joins
+ */
+ public void join() throws InterruptedException {
+ deathWatcher.join();
+ }
+
+ /*
+ * Shutdown all daemons gracefully
+ */
+ public void shutdown() {
+ shutdown(ExitCode.OK);
+ }
+
+ private void shutdown(int exitCode) {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ this.exitCode = exitCode;
+ try {
+ deathWatcher.interrupt();
+ deathWatcher.join();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ auditorElector.shutdown();
+ replicationWorker.shutdown();
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ private int getExitCode() {
+ return exitCode;
+ }
+
+ /*
+ * DeathWatcher for AutoRecovery daemons.
+ */
+ private static class AutoRecoveryDeathWatcher extends Thread {
+ private int watchInterval;
+ private AutoRecoveryMain autoRecoveryMain;
+
+ public AutoRecoveryDeathWatcher(AutoRecoveryMain autoRecoveryMain) {
+ super("AutoRecoveryDeathWatcher-"
+ + autoRecoveryMain.conf.getBookiePort());
+ this.autoRecoveryMain = autoRecoveryMain;
+ watchInterval = autoRecoveryMain.conf.getDeathWatchInterval();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(watchInterval);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ // If any one service not running, then shutdown peer.
+ if (!autoRecoveryMain.auditorElector.isRunning()
+ || !autoRecoveryMain.replicationWorker.isRunning()) {
+ autoRecoveryMain.shutdown();
+ break;
+ }
+ }
+ }
+ }
+
+ private static final Options opts = new Options();
+ static {
+ opts.addOption("c", "conf", true, "Bookie server configuration");
+ opts.addOption("h", "help", false, "Print help message");
+ }
+
+ /*
+ * Print usage
+ */
+ private static void printUsage() {
+ HelpFormatter hf = new HelpFormatter();
+ hf.printHelp("AutoRecoveryMain [options]\n", opts);
+ }
+
+ /*
+ * load configurations from file.
+ */
+ private static void loadConfFile(ServerConfiguration conf, String confFile)
+ throws IllegalArgumentException {
+ try {
+ conf.loadConf(new File(confFile).toURI().toURL());
+ } catch (MalformedURLException e) {
+ LOG.error("Could not open configuration file: " + confFile, e);
+ throw new IllegalArgumentException();
+ } catch (ConfigurationException e) {
+ LOG.error("Malformed configuration file: " + confFile, e);
+ throw new IllegalArgumentException();
+ }
+ LOG.info("Using configuration file " + confFile);
+ }
+
+ /*
+ * Parse console args
+ */
+ private static ServerConfiguration parseArgs(String[] args)
+ throws IllegalArgumentException {
+ try {
+ BasicParser parser = new BasicParser();
+ CommandLine cmdLine = parser.parse(opts, args);
+
+ if (cmdLine.hasOption('h')) {
+ throw new IllegalArgumentException();
+ }
+
+ ServerConfiguration conf = new ServerConfiguration();
+ String[] leftArgs = cmdLine.getArgs();
+
+ if (cmdLine.hasOption('c')) {
+ if (null != leftArgs && leftArgs.length > 0) {
+ throw new IllegalArgumentException();
+ }
+ String confFile = cmdLine.getOptionValue("c");
+ loadConfFile(conf, confFile);
+ }
+
+ if (null != leftArgs && leftArgs.length > 0) {
+ throw new IllegalArgumentException();
+ }
+ return conf;
+ } catch (ParseException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public static void main(String[] args) {
+ ServerConfiguration conf = null;
+ try {
+ conf = parseArgs(args);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Error parsing command line arguments : ", iae);
+ System.err.println(iae.getMessage());
+ printUsage();
+ System.exit(ExitCode.INVALID_CONF);
+ }
+
+ try {
+ final AutoRecoveryMain autoRecoveryMain = new AutoRecoveryMain(conf);
+ autoRecoveryMain.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ autoRecoveryMain.shutdown();
+ LOG.info("Shutdown AutoRecoveryMain successfully");
+ }
+ });
+ LOG.info("Register shutdown hook successfully");
+ autoRecoveryMain.join();
+ System.exit(autoRecoveryMain.getExitCode());
+ } catch (Exception e) {
+ LOG.error("Exception running AutoRecoveryMain : ", e);
+ System.exit(ExitCode.SERVER_EXCEPTION);
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Thu Oct 4 16:31:09 2012
@@ -284,6 +284,9 @@ public class ReplicationWorker implement
* Stop the replication worker service
*/
public void shutdown() {
+ if (!workerRunning) {
+ return;
+ }
workerRunning = false;
try {
underreplicationManager.close();
@@ -310,6 +313,13 @@ public class ReplicationWorker implement
}
}
+ /**
+ * Gives the running status of ReplicationWorker
+ */
+ boolean isRunning() {
+ return workerRunning;
+ }
+
private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh,
LedgerFragment ledgerFragment) {
List<InetSocketAddress> ensemble = ledgerFragment.getEnsemble();
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java?rev=1394145&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java Thu Oct 4 16:31:09 2012
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+
+/*
+ * Test the AuditorPeer
+ */
+public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
+
+ public AutoRecoveryMainTest() {
+ super(3);
+ }
+
+ /*
+ * test the startup of the auditorElector and RW.
+ */
+ public void testStartup() throws Exception {
+ AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
+ try {
+ main.start();
+ Thread.sleep(500);
+ assertTrue("AuditorElector should be running",
+ main.auditorElector.isRunning());
+ assertTrue("Replication worker should be running",
+ main.replicationWorker.isRunning());
+ } finally {
+ main.shutdown();
+ }
+ }
+
+ /*
+ * Test the shutdown of all daemons
+ */
+ public void testShutdown() throws Exception {
+ AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
+ main.start();
+ Thread.sleep(500);
+ assertTrue("AuditorElector should be running",
+ main.auditorElector.isRunning());
+ assertTrue("Replication worker should be running",
+ main.replicationWorker.isRunning());
+
+ main.shutdown();
+ assertFalse("AuditorElector should not be running",
+ main.auditorElector.isRunning());
+ assertFalse("Replication worker should not be running",
+ main.replicationWorker.isRunning());
+ }
+}