You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2017/12/21 22:46:42 UTC

[GitHub] merlimat closed pull request #961: Issue 922: allow run bookie together with broker

merlimat closed pull request #961: Issue 922: allow run bookie together with broker
URL: https://github.com/apache/incubator-pulsar/pull/961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 78a0b29c5..ccdf6547e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -18,11 +18,26 @@
  */
 package org.apache.pulsar;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.create;
 import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete;
 
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.ea.agentloader.AgentLoader;
+import java.io.File;
 import java.io.FileInputStream;
-
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -31,8 +46,6 @@
 import org.slf4j.LoggerFactory;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
-import com.ea.agentloader.AgentLoader;
-
 public class PulsarBrokerStarter {
 
     private static ServiceConfiguration loadConfig(String configFile) throws Exception {
@@ -44,8 +57,130 @@ private static ServiceConfiguration loadConfig(String configFile) throws Excepti
         return config;
     }
 
+    private static class BookieArguments {
+
+        @Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with broker")
+        private boolean runBookie = false;
+
+        @Parameter(names = {"-ra", "--run-bookie-autorecovery"}, description = "Run Bookie Autorecovery together with broker")
+        private boolean runBookieAutoRecovery = false;
+
+        @Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
+        private String bookieConfigFile;
+
+        @Parameter(names = {"-h", "--help"}, description = "Show this help message")
+        private boolean help = false;
+    }
+
+    private static ServerConfiguration readBookieConfFile(String bookieConfigFile) throws IllegalArgumentException {
+        ServerConfiguration bookieConf = new ServerConfiguration();
+        try {
+            bookieConf.loadConf(new File(bookieConfigFile).toURI().toURL());
+            bookieConf.validate();
+            log.info("Using bookie configuration file {}", bookieConfigFile);
+        } catch (MalformedURLException e) {
+            log.error("Could not open configuration file: {}", bookieConfigFile, e);
+            throw new IllegalArgumentException("Could not open configuration file");
+        } catch (ConfigurationException e) {
+            log.error("Malformed configuration file: {}", bookieConfigFile, e);
+            throw new IllegalArgumentException("Malformed configuration file");
+        }
+        return bookieConf;
+    }
+
+    private static class PulsarBookieStarter {
+        private final BookieServer bookieServer;
+        private final AutoRecoveryMain autoRecoveryMain;
+        private final StatsProvider bookieStatsProvider;
+        private final ServerConfiguration bookieConfig;
+
+        PulsarBookieStarter(String[] args) throws Exception{
+            BookieArguments bookieArguments = new BookieArguments();
+            JCommander jcommander = new JCommander(bookieArguments);
+            jcommander.setProgramName("PulsarBrokerStarter <broker.conf>");
+
+            // parse args by jcommander
+            jcommander.parse(args);
+            if (bookieArguments.help) {
+                jcommander.usage();
+                System.exit(-1);
+            }
+            if ((bookieArguments.runBookie || bookieArguments.runBookieAutoRecovery)
+                && isBlank(bookieArguments.bookieConfigFile)) {
+                jcommander.usage();
+                throw new IllegalArgumentException("No configuration file for Bookie");
+            }
+
+            // init stats provider
+            if (bookieArguments.runBookie || bookieArguments.runBookieAutoRecovery) {
+                checkState(isNotBlank(bookieArguments.bookieConfigFile),
+                    "No configuration file for Bookie");
+                bookieConfig = readBookieConfFile(bookieArguments.bookieConfigFile);
+                Class<? extends StatsProvider> statsProviderClass = bookieConfig.getStatsProviderClass();
+                bookieStatsProvider = ReflectionUtils.newInstance(statsProviderClass);
+            } else {
+                bookieConfig = null;
+                bookieStatsProvider = null;
+            }
+
+            // init bookie server
+            if (bookieArguments.runBookie) {
+                checkNotNull(bookieConfig, "No ServerConfiguration for Bookie");
+                checkNotNull(bookieStatsProvider, "No Stats Provider for Bookie");
+                bookieServer = new BookieServer(bookieConfig, bookieStatsProvider.getStatsLogger(""));
+            } else {
+                bookieServer = null;
+            }
+
+            // init bookie AutorecoveryMain
+            if (bookieArguments.runBookieAutoRecovery) {
+                checkNotNull(bookieConfig, "No ServerConfiguration for Bookie Autorecovery");
+                autoRecoveryMain = new AutoRecoveryMain(bookieConfig);
+            } else {
+                autoRecoveryMain = null;
+            }
+        }
+
+        public void start() throws Exception {
+            if (bookieStatsProvider != null) {
+                bookieStatsProvider.start(bookieConfig);
+                log.info("started bookieStatsProvider.");
+            }
+            if (bookieServer != null) {
+                bookieServer.start();
+                log.info("started bookieServer.");
+            }
+            if (autoRecoveryMain != null) {
+                autoRecoveryMain.start();
+                log.info("started bookie autoRecoveryMain.");
+            }
+        }
+
+        public void join() throws InterruptedException {
+            if (bookieServer != null) {
+                bookieServer.join();
+            }
+            if (autoRecoveryMain != null) {
+                autoRecoveryMain.join();
+            }
+        }
+
+        public void shutdown() {
+            if (bookieStatsProvider != null) {
+                bookieStatsProvider.stop();
+            }
+            if (bookieServer != null) {
+                bookieServer.shutdown();
+            }
+            if (autoRecoveryMain != null) {
+                autoRecoveryMain.shutdown();
+            }
+        }
+    }
+
+
     public static void main(String[] args) throws Exception {
-        if (args.length != 1) {
+        if (args.length < 1) {
             throw new IllegalArgumentException("Need to specify a configuration file");
         }
 
@@ -58,10 +193,19 @@ public static void main(String[] args) throws Exception {
 
         // load aspectj-weaver agent for instrumentation
         AgentLoader.loadAgentClass(Agent.class.getName(), null);
-        
+
+        PulsarBookieStarter bookieStarter = new PulsarBookieStarter(Arrays.copyOfRange(args, 1, args.length));
+        bookieStarter.start();
+
         @SuppressWarnings("resource")
         final PulsarService service = new PulsarService(config);
-        Runtime.getRuntime().addShutdownHook(service.getShutdownService());
+        Runtime.getRuntime().addShutdownHook(
+            new Thread(() -> {
+                service.getShutdownService().run();
+                log.info("Shut down broker service successfully.");
+                bookieStarter.shutdown();
+            })
+        );
 
         try {
             service.start();
@@ -73,6 +217,8 @@ public static void main(String[] args) throws Exception {
         }
 
         service.waitUntilClosed();
+
+        bookieStarter.join();
     }
 
     private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStarter.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java
index e10d5bade..31290e42d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java
@@ -21,6 +21,7 @@
 import static org.testng.Assert.fail;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -28,6 +29,7 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.pulsar.PulsarBrokerStarter;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.testng.Assert;
@@ -40,15 +42,8 @@
  *          Created on Sep 6, 2012
  */
 public class PulsarBrokerStarterTest {
-    /**
-     * Tests the private static <code>loadConfig</code> method of {@link PulsarBrokerStarter} class: verifies (1) if the
-     * method returns a non-null {@link ServiceConfiguration} instance where all required settings are filled in and (2)
-     * if the property variables inside the given property file are correctly referred to that returned object.
-     */
-    @Test
-    public void testLoadConfig() throws SecurityException, NoSuchMethodException, IOException, IllegalArgumentException,
-            IllegalAccessException, InvocationTargetException {
 
+    private File createValidBrokerConfigFile() throws FileNotFoundException {
         File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
         if (testConfigFile.exists()) {
             testConfigFile.delete();
@@ -85,7 +80,19 @@ public void testLoadConfig() throws SecurityException, NoSuchMethodException, IO
 
         printWriter.close();
         testConfigFile.deleteOnExit();
+        return testConfigFile;
+    }
+
+    /**
+     * Tests the private static <code>loadConfig</code> method of {@link PulsarBrokerStarter} class: verifies (1) if the
+     * method returns a non-null {@link ServiceConfiguration} instance where all required settings are filled in and (2)
+     * if the property variables inside the given property file are correctly referred to that returned object.
+     */
+    @Test
+    public void testLoadConfig() throws SecurityException, NoSuchMethodException, IOException, IllegalArgumentException,
+            IllegalAccessException, InvocationTargetException {
 
+        File testConfigFile = createValidBrokerConfigFile();
         Method targetMethod = PulsarBrokerStarter.class.getDeclaredMethod("loadConfig", String.class);
         targetMethod.setAccessible(true);
 
@@ -270,4 +277,71 @@ public void testMainWithNoArgument() throws Exception {
             // code should reach here.
         }
     }
+
+
+    /**
+     * Verifies that the main throws {@link IllegalArgumentException}
+     * when no config file for bookie and bookie auto recovery is given.
+     */
+    @Test
+    public void testMainRunBookieAndAutoRecoveryNoConfig() throws Exception {
+        try {
+            File testConfigFile = createValidBrokerConfigFile();
+            String[] args = {testConfigFile.getAbsolutePath(), "-rb", "-ra"};
+            PulsarBrokerStarter.main(args);
+            Assert.fail("No Config file for bookie auto recovery should've raised IllegalArgumentException!");
+        } catch (IllegalArgumentException e) {
+            // code should reach here.
+            Assert.assertEquals(e.getMessage(), "No configuration file for Bookie");
+        }
+    }
+
+    /**
+     * Verifies that the main throws {@link IllegalArgumentException}
+     * when no config file for bookie auto recovery is given.
+     */
+    @Test
+    public void testMainRunBookieRecoveryNoConfig() throws Exception {
+        try {
+            File testConfigFile = createValidBrokerConfigFile();
+            String[] args = {testConfigFile.getAbsolutePath(), "-ra"};
+            PulsarBrokerStarter.main(args);
+            Assert.fail("No Config file for bookie auto recovery should've raised IllegalArgumentException!");
+        } catch (IllegalArgumentException e) {
+            // code should reach here.
+            Assert.assertEquals(e.getMessage(), "No configuration file for Bookie");
+        }
+    }
+
+    /**
+     * Verifies that the main throws {@link IllegalArgumentException} when no config file for bookie is given.
+     */
+    @Test
+    public void testMainRunBookieNoConfig() throws Exception {
+        try {
+            File testConfigFile = createValidBrokerConfigFile();
+            String[] args = {testConfigFile.getAbsolutePath(), "-rb"};
+            PulsarBrokerStarter.main(args);
+            Assert.fail("No Config file for bookie should've raised IllegalArgumentException!");
+        } catch (IllegalArgumentException e) {
+            // code should reach here
+            Assert.assertEquals(e.getMessage(), "No configuration file for Bookie");
+        }
+    }
+
+    /**
+     * Verifies that the main throws {@link IllegalArgumentException} when no config file for bookie is given.
+     */
+    @Test
+    public void testMainRunBookieEmptyConfig() throws Exception {
+        try {
+            File testConfigFile = createValidBrokerConfigFile();
+            String[] args = {testConfigFile.getAbsolutePath(), "-ra", "-rb", "-bc", testConfigFile.getAbsolutePath()};
+            PulsarBrokerStarter.main(args);
+            Assert.fail("Effectively empty config file for bookie should've raised NoWritableLedgerDirException!");
+        } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
+            // code should reach here
+            // Since empty config file will have empty ledgerDirs, it should raise exception when bookie init.
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services