You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/04 18:31:09 UTC

svn commit: r1394145 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/bin/ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/

Author: ivank
Date: Thu Oct  4 16:31:09 2012
New Revision: 1394145

URL: http://svn.apache.org/viewvc?rev=1394145&view=rev
Log:
BOOKKEEPER-319: Manage auditing and replication processes (Vinay via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
    zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct  4 16:31:09 2012
@@ -148,6 +148,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-278: Ability to disable auto recovery temporarily (rakeshr via ivank)
 
+        BOOKKEEPER-319: Manage auditing and replication processes (Vinay via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Thu Oct  4 16:31:09 2012
@@ -78,6 +78,7 @@ bookkeeper_help() {
 Usage: bookkeeper <command>
 where command is one of:
     bookie              Run a bookie server
+    autorecovery        Run AutoRecovery service daemon
     localbookie <n>     Run a test ensemble of <n> bookies locally
     upgrade             Upgrade bookie filesystem
     shell               Run shell for admin commands
@@ -165,6 +166,8 @@ OPTS="$OPTS -Dbookkeeper.log.file=$BOOKI
 cd "$BK_HOME"
 if [ $COMMAND == "bookie" ]; then
     exec java $OPTS $JMX_ARGS org.apache.bookkeeper.proto.BookieServer --conf $BOOKIE_CONF $@
+elif [ $COMMAND == "autorecovery" ]; then
+    exec java $OPTS $JMX_ARGS org.apache.bookkeeper.replication.AutoRecoveryMain --conf $BOOKIE_CONF $@
 elif [ $COMMAND == "localbookie" ]; then
     NUMBER=$1
     shift

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper-daemon.sh Thu Oct  4 16:31:09 2012
@@ -61,6 +61,9 @@ case $command in
     (bookie)
         echo "doing $startStop $command ..."
         ;;
+    (autorecovery)
+        echo "doing $startStop $command ..."
+        ;;
     (*)
         echo "Error: unknown service name $command"
         usage

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java Thu Oct  4 16:31:09 2012
@@ -66,7 +66,7 @@ public class AuditorElector {
     private final ZooKeeper zkc;
 
     private String myVote;
-    private Auditor auditor;
+    Auditor auditor;
     private volatile boolean running = true;
 
     /**

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1394145&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Thu Oct  4 16:31:09 2012
@@ -0,0 +1,267 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+import org.apache.bookkeeper.bookie.ExitCode;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to start/stop the AutoRecovery daemons Auditor and ReplicationWorker
+ */
+public class AutoRecoveryMain {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(AutoRecoveryMain.class);
+
+    private ServerConfiguration conf;
+    private ZooKeeper zk;
+    AuditorElector auditorElector;
+    ReplicationWorker replicationWorker;
+    private AutoRecoveryDeathWatcher deathWatcher;
+    private int exitCode;
+    private volatile boolean shuttingDown = false;
+
+    public AutoRecoveryMain(ServerConfiguration conf) throws IOException,
+            InterruptedException, KeeperException, UnavailableException,
+            CompatibilityException {
+        this.conf = conf;
+        zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(),
+                conf.getZkTimeout());
+        Watcher watcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // Check for expired connection.
+                if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
+                    LOG.error("ZK client connection to the"
+                            + " ZK server has expired!");
+                    shutdown(ExitCode.ZK_EXPIRED);
+                }
+            }
+        };
+        zk.register(watcher);
+        auditorElector = new AuditorElector(
+                getMyBookieAddress(conf).toString(), conf, zk);
+        replicationWorker = new ReplicationWorker(zk, conf,
+                getMyBookieAddress(conf));
+        deathWatcher = new AutoRecoveryDeathWatcher(this);
+    }
+
+    private static InetSocketAddress getMyBookieAddress(ServerConfiguration conf)
+            throws UnknownHostException {
+        return new InetSocketAddress(InetAddress.getLocalHost()
+                .getHostAddress(), conf.getBookiePort());
+    }
+
+    /*
+     * Start daemons
+     */
+    public void start() throws UnavailableException {
+        auditorElector.doElection();
+        replicationWorker.start();
+        deathWatcher.start();
+    }
+
+    /*
+     * Waits till all daemons joins
+     */
+    public void join() throws InterruptedException {
+        deathWatcher.join();
+    }
+
+    /*
+     * Shutdown all daemons gracefully
+     */
+    public void shutdown() {
+        shutdown(ExitCode.OK);
+    }
+
+    private void shutdown(int exitCode) {
+        if (shuttingDown) {
+            return;
+        }
+        shuttingDown = true;
+        this.exitCode = exitCode;
+        try {
+            deathWatcher.interrupt();
+            deathWatcher.join();
+        } catch (InterruptedException e) {
+            // Ignore
+        }
+        auditorElector.shutdown();
+        replicationWorker.shutdown();
+        try {
+            zk.close();
+        } catch (InterruptedException e) {
+            // Ignore
+        }
+    }
+
+    private int getExitCode() {
+        return exitCode;
+    }
+
+    /*
+     * DeathWatcher for AutoRecovery daemons.
+     */
+    private static class AutoRecoveryDeathWatcher extends Thread {
+        private int watchInterval;
+        private AutoRecoveryMain autoRecoveryMain;
+
+        public AutoRecoveryDeathWatcher(AutoRecoveryMain autoRecoveryMain) {
+            super("AutoRecoveryDeathWatcher-"
+                    + autoRecoveryMain.conf.getBookiePort());
+            this.autoRecoveryMain = autoRecoveryMain;
+            watchInterval = autoRecoveryMain.conf.getDeathWatchInterval();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(watchInterval);
+                } catch (InterruptedException ie) {
+                    break;
+                }
+                // If any one service not running, then shutdown peer.
+                if (!autoRecoveryMain.auditorElector.isRunning()
+                        || !autoRecoveryMain.replicationWorker.isRunning()) {
+                    autoRecoveryMain.shutdown();
+                    break;
+                }
+            }
+        }
+    }
+
+    private static final Options opts = new Options();
+    static {
+        opts.addOption("c", "conf", true, "Bookie server configuration");
+        opts.addOption("h", "help", false, "Print help message");
+    }
+
+    /*
+     * Print usage
+     */
+    private static void printUsage() {
+        HelpFormatter hf = new HelpFormatter();
+        hf.printHelp("AutoRecoveryMain [options]\n", opts);
+    }
+
+    /*
+     * load configurations from file.
+     */
+    private static void loadConfFile(ServerConfiguration conf, String confFile)
+            throws IllegalArgumentException {
+        try {
+            conf.loadConf(new File(confFile).toURI().toURL());
+        } catch (MalformedURLException e) {
+            LOG.error("Could not open configuration file: " + confFile, e);
+            throw new IllegalArgumentException();
+        } catch (ConfigurationException e) {
+            LOG.error("Malformed configuration file: " + confFile, e);
+            throw new IllegalArgumentException();
+        }
+        LOG.info("Using configuration file " + confFile);
+    }
+
+    /*
+     * Parse console args
+     */
+    private static ServerConfiguration parseArgs(String[] args)
+            throws IllegalArgumentException {
+        try {
+            BasicParser parser = new BasicParser();
+            CommandLine cmdLine = parser.parse(opts, args);
+
+            if (cmdLine.hasOption('h')) {
+                throw new IllegalArgumentException();
+            }
+
+            ServerConfiguration conf = new ServerConfiguration();
+            String[] leftArgs = cmdLine.getArgs();
+
+            if (cmdLine.hasOption('c')) {
+                if (null != leftArgs && leftArgs.length > 0) {
+                    throw new IllegalArgumentException();
+                }
+                String confFile = cmdLine.getOptionValue("c");
+                loadConfFile(conf, confFile);
+            }
+
+            if (null != leftArgs && leftArgs.length > 0) {
+                throw new IllegalArgumentException();
+            }
+            return conf;
+        } catch (ParseException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    public static void main(String[] args) {
+        ServerConfiguration conf = null;
+        try {
+            conf = parseArgs(args);
+        } catch (IllegalArgumentException iae) {
+            LOG.error("Error parsing command line arguments : ", iae);
+            System.err.println(iae.getMessage());
+            printUsage();
+            System.exit(ExitCode.INVALID_CONF);
+        }
+
+        try {
+            final AutoRecoveryMain autoRecoveryMain = new AutoRecoveryMain(conf);
+            autoRecoveryMain.start();
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    autoRecoveryMain.shutdown();
+                    LOG.info("Shutdown AutoRecoveryMain successfully");
+                }
+            });
+            LOG.info("Register shutdown hook successfully");
+            autoRecoveryMain.join();
+            System.exit(autoRecoveryMain.getExitCode());
+        } catch (Exception e) {
+            LOG.error("Exception running AutoRecoveryMain : ", e);
+            System.exit(ExitCode.SERVER_EXCEPTION);
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1394145&r1=1394144&r2=1394145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Thu Oct  4 16:31:09 2012
@@ -284,6 +284,9 @@ public class ReplicationWorker implement
      * Stop the replication worker service
      */
     public void shutdown() {
+        if (!workerRunning) {
+            return;
+        }
         workerRunning = false;
         try {
             underreplicationManager.close();
@@ -310,6 +313,13 @@ public class ReplicationWorker implement
         }
     }
 
+    /**
+     * Gives the running status of ReplicationWorker
+     */
+    boolean isRunning() {
+        return workerRunning;
+    }
+
     private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh,
             LedgerFragment ledgerFragment) {
         List<InetSocketAddress> ensemble = ledgerFragment.getEnsemble();

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java?rev=1394145&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java Thu Oct  4 16:31:09 2012
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+
+/*
+ * Test the AuditorPeer
+ */
+public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
+
+    public AutoRecoveryMainTest() {
+        super(3);
+    }
+
+    /*
+     * test the startup of the auditorElector and RW.
+     */
+    public void testStartup() throws Exception {
+        AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
+        try {
+            main.start();
+            Thread.sleep(500);
+            assertTrue("AuditorElector should be running",
+                    main.auditorElector.isRunning());
+            assertTrue("Replication worker should be running",
+                    main.replicationWorker.isRunning());
+        } finally {
+            main.shutdown();
+        }
+    }
+
+    /*
+     * Test the shutdown of all daemons
+     */
+    public void testShutdown() throws Exception {
+        AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
+        main.start();
+        Thread.sleep(500);
+        assertTrue("AuditorElector should be running",
+                main.auditorElector.isRunning());
+        assertTrue("Replication worker should be running",
+                main.replicationWorker.isRunning());
+
+        main.shutdown();
+        assertFalse("AuditorElector should not be running",
+                main.auditorElector.isRunning());
+        assertFalse("Replication worker should not be running",
+                main.replicationWorker.isRunning());
+    }
+}