You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/04 09:16:58 UTC
[bookkeeper] branch master updated: Introduce lifecycle components
for managing components in AutoRecovery
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3c34166 Introduce lifecycle components for managing components in AutoRecovery
3c34166 is described below
commit 3c3416669d750b70d4f9c0e3ac1c704b77b45023
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Thu Oct 4 02:16:47 2018 -0700
Introduce lifecycle components for managing components in AutoRecovery
Descriptions of the changes in this PR:
- lifecycle components for managing components in AutoRecovery
- expose metrics of AR in the same http admin endpoint
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1726 from reddycharan/stackar
---
bookkeeper-server/pom.xml | 12 ++
.../bookkeeper/replication/AutoRecoveryMain.java | 154 ++++++++++++++-------
.../java/org/apache/bookkeeper/server/Main.java | 2 +-
.../server/service/AutoRecoveryService.java | 11 ++
.../bookie/BookieInitializationTest.java | 130 +++++++++++++++++
5 files changed, 261 insertions(+), 48 deletions(-)
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index e154a36..e093d80 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -150,6 +150,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.stats</groupId>
+ <artifactId>prometheus-metrics-provider</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.http</groupId>
+ <artifactId>vertx-http-server</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index d63e19e..cd4aee2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -27,19 +27,26 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.net.MalformedURLException;
+import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
import org.apache.bookkeeper.bookie.ExitCode;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.component.ComponentStarter;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.HttpServerLoader;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
+import org.apache.bookkeeper.server.service.AutoRecoveryService;
+import org.apache.bookkeeper.server.service.HttpService;
+import org.apache.bookkeeper.server.service.StatsProviderService;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.cli.BasicParser;
@@ -70,6 +77,9 @@ public class AutoRecoveryMain {
private volatile boolean shuttingDown = false;
private volatile boolean running = false;
+ // Exception handler
+ private volatile UncaughtExceptionHandler uncaughtExceptionHandler = null;
+
public AutoRecoveryMain(ServerConfiguration conf) throws IOException,
InterruptedException, KeeperException, UnavailableException,
CompatibilityException {
@@ -102,6 +112,9 @@ public class AutoRecoveryMain {
public void start() throws UnavailableException {
auditorElector.start();
replicationWorker.start();
+ if (null != uncaughtExceptionHandler) {
+ deathWatcher.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ }
deathWatcher.start();
running = true;
}
@@ -129,13 +142,6 @@ public class AutoRecoveryMain {
shuttingDown = true;
running = false;
this.exitCode = exitCode;
- try {
- deathWatcher.interrupt();
- deathWatcher.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted shutting down auto recovery", e);
- }
try {
auditorElector.shutdown();
@@ -158,6 +164,18 @@ public class AutoRecoveryMain {
return exitCode;
}
+ /**
+ * Currently the uncaught exception handler is used for DeathWatcher to notify
+ * lifecycle management that a bookie is dead for some reasons.
+ *
+ * <p>in future, we can register this <tt>exceptionHandler</tt> to critical threads
+ * so when those threads are dead, it will automatically trigger lifecycle management
+ * to shutdown the process.
+ */
+ public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler) {
+ this.uncaughtExceptionHandler = exceptionHandler;
+ }
+
@VisibleForTesting
public Auditor getAuditor() {
return auditorElector.getAuditor();
@@ -171,7 +189,7 @@ public class AutoRecoveryMain {
/*
* DeathWatcher for AutoRecovery daemons.
*/
- private static class AutoRecoveryDeathWatcher extends BookieCriticalThread {
+ private class AutoRecoveryDeathWatcher extends BookieCriticalThread {
private int watchInterval;
private AutoRecoveryMain autoRecoveryMain;
@@ -180,6 +198,13 @@ public class AutoRecoveryMain {
+ autoRecoveryMain.conf.getBookiePort());
this.autoRecoveryMain = autoRecoveryMain;
watchInterval = autoRecoveryMain.conf.getDeathWatchInterval();
+ // set a default uncaught exception handler to shutdown the AutoRecovery
+ // when it notices the AutoRecovery is not running any more.
+ setUncaughtExceptionHandler((thread, cause) -> {
+ LOG.info("AutoRecoveryDeathWatcher exited loop due to uncaught exception from thread {}",
+ thread.getName(), cause);
+ shutdown();
+ });
}
@Override
@@ -189,13 +214,20 @@ public class AutoRecoveryMain {
Thread.sleep(watchInterval);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- break;
}
// If any one service not running, then shutdown peer.
- if (!autoRecoveryMain.auditorElector.isRunning()
- || !autoRecoveryMain.replicationWorker.isRunning()) {
- autoRecoveryMain.shutdown(ExitCode.SERVER_EXCEPTION);
- break;
+ if (!autoRecoveryMain.auditorElector.isRunning() || !autoRecoveryMain.replicationWorker.isRunning()) {
+ LOG.info(
+ "AutoRecoveryDeathWatcher noticed the AutoRecovery is not running any more,"
+ + "exiting the watch loop!");
+ /*
+ * death watcher has noticed that AutoRecovery is not
+ * running any more throw an exception to fail the death
+ * watcher thread and it will trigger the uncaught exception
+ * handler to handle this "AutoRecovery not running"
+ * situation.
+ */
+ throw new RuntimeException("AutoRecovery is not running any more");
}
}
}
@@ -266,45 +298,73 @@ public class AutoRecoveryMain {
}
public static void main(String[] args) {
- ServerConfiguration conf = null;
+ int retCode = doMain(args);
+ Runtime.getRuntime().exit(retCode);
+ }
+
+ static int doMain(String[] args) {
+
+ ServerConfiguration conf;
+
+ // 0. parse command line
try {
conf = parseArgs(args);
} catch (IllegalArgumentException iae) {
- LOG.error("Error parsing command line arguments : ", iae);
- System.err.println(iae.getMessage());
- printUsage();
- System.exit(ExitCode.INVALID_CONF);
+ return ExitCode.INVALID_CONF;
}
+ // 1. building the component stack:
+ LifecycleComponent server;
try {
- final AutoRecoveryMain autoRecoveryMain = new AutoRecoveryMain(conf);
- autoRecoveryMain.start();
- HttpServerLoader.loadHttpServer(conf);
- final HttpServer httpServer = HttpServerLoader.get();
- if (conf.isHttpServerEnabled() && httpServer != null) {
- BKHttpServiceProvider serviceProvider = new BKHttpServiceProvider.Builder()
- .setAutoRecovery(autoRecoveryMain)
- .setServerConfiguration(conf)
- .build();
- httpServer.initialize(serviceProvider);
- httpServer.startServer(conf.getHttpServerPort());
- }
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- autoRecoveryMain.shutdown();
- if (httpServer != null && httpServer.isRunning()) {
- httpServer.stopServer();
- }
- LOG.info("Shutdown AutoRecoveryMain successfully");
- }
- });
- LOG.info("Register shutdown hook successfully");
- autoRecoveryMain.join();
- System.exit(autoRecoveryMain.getExitCode());
+ server = buildAutoRecoveryServer(new BookieConfiguration(conf));
} catch (Exception e) {
- LOG.error("Exception running AutoRecoveryMain : ", e);
- System.exit(ExitCode.SERVER_EXCEPTION);
+ LOG.error("Failed to build AutoRecovery Server", e);
+ return ExitCode.SERVER_EXCEPTION;
+ }
+
+ // 2. start the server
+ try {
+ ComponentStarter.startComponent(server).get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ // the server is interrupted
+ LOG.info("AutoRecovery server is interrupted. Exiting ...");
+ } catch (ExecutionException ee) {
+ LOG.error("Error in bookie shutdown", ee.getCause());
+ return ExitCode.SERVER_EXCEPTION;
}
+ return ExitCode.OK;
+ }
+
+ public static LifecycleComponentStack buildAutoRecoveryServer(BookieConfiguration conf) throws Exception {
+ LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
+ .withName("autorecovery-server");
+
+ // 1. build stats provider
+ StatsProviderService statsProviderService = new StatsProviderService(conf);
+ StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+
+ serverBuilder.addComponent(statsProviderService);
+ LOG.info("Load lifecycle component : {}", StatsProviderService.class.getName());
+
+ // 2. build AutoRecovery server
+ AutoRecoveryService autoRecoveryService = new AutoRecoveryService(conf, rootStatsLogger);
+
+ serverBuilder.addComponent(autoRecoveryService);
+ LOG.info("Load lifecycle component : {}", AutoRecoveryService.class.getName());
+
+ // 4. build http service
+ if (conf.getServerConf().isHttpServerEnabled()) {
+ BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
+ .setAutoRecovery(autoRecoveryService.getAutoRecoveryServer())
+ .setServerConfiguration(conf.getServerConf())
+ .setStatsProvider(statsProviderService.getStatsProvider()).build();
+ HttpService httpService = new HttpService(provider, conf, rootStatsLogger);
+
+ serverBuilder.addComponent(httpService);
+ LOG.info("Load lifecycle component : {}", HttpService.class.getName());
+ }
+
+ return serverBuilder.build();
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index ae92955..b991d31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -284,7 +284,7 @@ public class Main {
* @param conf bookie server configuration
* @return lifecycle stack
*/
- static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throws Exception {
+ public static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throws Exception {
LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder().withName("bookie-server");
// 1. build stats provider
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
index b2b8f07..f8389df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.server.service;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
@@ -42,6 +44,15 @@ public class AutoRecoveryService extends ServerLifecycleComponent {
}
@Override
+ public void setExceptionHandler(UncaughtExceptionHandler handler) {
+ main.setExceptionHandler(handler);
+ }
+
+ public AutoRecoveryMain getAutoRecoveryServer() {
+ return main;
+ }
+
+ @Override
protected void doStart() {
try {
this.main.start();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 48ea817..6500e0b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -33,16 +33,23 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.BindException;
import java.net.InetAddress;
+import java.net.URL;
+import java.net.URLConnection;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
@@ -55,20 +62,29 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.component.ComponentStarter;
+import org.apache.bookkeeper.common.component.Lifecycle;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.http.HttpRouter;
+import org.apache.bookkeeper.http.HttpServerLoader;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.replication.ReplicationStats;
+import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.server.service.AutoRecoveryService;
import org.apache.bookkeeper.server.service.BookieService;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.tls.SecurityException;
@@ -91,6 +107,8 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory
.getLogger(BookieInitializationTest.class);
+ private static ObjectMapper om = new ObjectMapper();
+
@Rule
public final TestName runtime = new TestName();
ZKMetadataBookieDriver driver;
@@ -495,6 +513,22 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
startFuture.get();
}
+ @Test
+ public void testAutoRecoveryServiceExceptionHandler() throws Exception {
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setMetadataServiceUri(metadataServiceUri);
+
+ BookieConfiguration bkConf = new BookieConfiguration(conf);
+ AutoRecoveryService service = new AutoRecoveryService(bkConf, NullStatsLogger.INSTANCE);
+ CompletableFuture<Void> startFuture = ComponentStarter.startComponent(service);
+
+ // shutdown the AutoRecovery service
+ service.getAutoRecoveryServer().shutdown();
+
+ // the AutoRecovery lifecycle component should be shutdown.
+ startFuture.get();
+ }
+
/**
* Verify bookie server starts up on ephemeral ports.
*/
@@ -1072,4 +1106,100 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
}
}
+ @Test
+ public void testIOVertexHTTPServerEndpointForBookieWithPrometheusProvider() throws Exception {
+ File tmpDir = createTempDir("bookie", "test");
+
+ final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+ .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(new String[] { tmpDir.getPath() })
+ .setBookiePort(PortManager.nextFreePort()).setMetadataServiceUri(metadataServiceUri)
+ .setListeningInterface(null);
+
+ /*
+ * enable io.vertx http server
+ */
+ int nextFreePort = PortManager.nextFreePort();
+ conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+ conf.setHttpServerEnabled(true);
+ conf.setProperty(HttpServerLoader.HTTP_SERVER_CLASS, "org.apache.bookkeeper.http.vertx.VertxHttpServer");
+ conf.setHttpServerPort(nextFreePort);
+
+ // 1. building the component stack:
+ LifecycleComponent server = Main.buildBookieServer(new BookieConfiguration(conf));
+ // 2. start the server
+ CompletableFuture<Void> stackComponentFuture = ComponentStarter.startComponent(server);
+ while (server.lifecycleState() != Lifecycle.State.STARTED) {
+ Thread.sleep(100);
+ }
+
+ // Now, hit the rest endpoint for metrics
+ URL url = new URL("http://localhost:" + nextFreePort + HttpRouter.METRICS);
+ URLConnection urlc = url.openConnection();
+ BufferedReader in = new BufferedReader(new InputStreamReader(urlc.getInputStream()));
+ String inputLine;
+ StringBuilder metricsStringBuilder = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ metricsStringBuilder.append(inputLine);
+ }
+ in.close();
+ String metrics = metricsStringBuilder.toString();
+ // do primitive checks if metrics string contains some stats
+ assertTrue("Metrics should contain basic counters", metrics.contains(BookKeeperServerStats.BOOKIE_ADD_ENTRY));
+
+ // Now, hit the rest endpoint for configs
+ url = new URL("http://localhost:" + nextFreePort + HttpRouter.SERVER_CONFIG);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> configMap = om.readValue(url, Map.class);
+ if (configMap.isEmpty() || !configMap.containsKey("bookiePort")) {
+ Assert.fail("Failed to map configurations to valid JSON entries.");
+ }
+ stackComponentFuture.cancel(true);
+ }
+
+ @Test
+ public void testIOVertexHTTPServerEndpointForARWithPrometheusProvider() throws Exception {
+ final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+ .setMetadataServiceUri(metadataServiceUri).setListeningInterface(null);
+
+ /*
+ * enable io.vertx http server
+ */
+ int nextFreePort = PortManager.nextFreePort();
+ conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+ conf.setHttpServerEnabled(true);
+ conf.setProperty(HttpServerLoader.HTTP_SERVER_CLASS, "org.apache.bookkeeper.http.vertx.VertxHttpServer");
+ conf.setHttpServerPort(nextFreePort);
+
+ // 1. building the component stack:
+ LifecycleComponent server = AutoRecoveryMain.buildAutoRecoveryServer(new BookieConfiguration(conf));
+ // 2. start the server
+ CompletableFuture<Void> stackComponentFuture = ComponentStarter.startComponent(server);
+ while (server.lifecycleState() != Lifecycle.State.STARTED) {
+ Thread.sleep(100);
+ }
+
+ // Now, hit the rest endpoint for metrics
+ URL url = new URL("http://localhost:" + nextFreePort + HttpRouter.METRICS);
+ URLConnection urlc = url.openConnection();
+ BufferedReader in = new BufferedReader(new InputStreamReader(urlc.getInputStream()));
+ String inputLine;
+ StringBuilder metricsStringBuilder = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ metricsStringBuilder.append(inputLine);
+ }
+ in.close();
+ String metrics = metricsStringBuilder.toString();
+ // do primitive checks if metrics string contains some stats
+ assertTrue("Metrics should contain basic counters",
+ metrics.contains(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS));
+
+ // Now, hit the rest endpoint for configs
+ url = new URL("http://localhost:" + nextFreePort + HttpRouter.SERVER_CONFIG);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> configMap = om.readValue(url, Map.class);
+ if (configMap.isEmpty() || !configMap.containsKey("metadataServiceUri")) {
+ Assert.fail("Failed to map configurations to valid JSON entries.");
+ }
+ stackComponentFuture.cancel(true);
+ }
}