You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:25 UTC
[18/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java
new file mode 100644
index 0000000..96bc338
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * DistributedLog Client Related Utils.
+ */
+public class ClientUtils {
+
+ public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {
+ DistributedLogClientImpl clientImpl = builder.buildClient();
+ return Pair.of((DistributedLogClient) clientImpl, (MonitorServiceClient) clientImpl);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
new file mode 100644
index 0000000..9cc085d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LocalDLMEmulator;
+import org.apache.distributedlog.client.routing.SingleHostRoutingService;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
+import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import com.twitter.finagle.builder.Server;
+import java.io.File;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DistributedLog Cluster is an emulator to run distributedlog components.
+ */
+public class DistributedLogCluster {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder to build distributedlog cluster.
+ */
+ public static class Builder {
+
+ int numBookies = 3;
+ boolean shouldStartZK = true;
+ String zkHost = "127.0.0.1";
+ int zkPort = 0;
+ boolean shouldStartProxy = true;
+ int proxyPort = 7000;
+ boolean thriftmux = false;
+ DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
+ .setLockTimeout(10)
+ .setOutputBufferSize(0)
+ .setImmediateFlushEnabled(true);
+ ServerConfiguration bkConf = new ServerConfiguration();
+
+ private Builder() {}
+
+ /**
+ * How many bookies to run. By default is 3.
+ *
+ * @return builder
+ */
+ public Builder numBookies(int numBookies) {
+ this.numBookies = numBookies;
+ return this;
+ }
+
+ /**
+ * Whether to start zookeeper? By default is true.
+ *
+ * @param startZK
+ * flag to start zookeeper?
+ * @return builder
+ */
+ public Builder shouldStartZK(boolean startZK) {
+ this.shouldStartZK = startZK;
+ return this;
+ }
+
+ /**
+ * ZooKeeper server to run. By default it runs locally on '127.0.0.1'.
+ *
+ * @param zkServers
+ * zk servers
+ * @return builder
+ */
+ public Builder zkServers(String zkServers) {
+ this.zkHost = zkServers;
+ return this;
+ }
+
+ /**
+ * ZooKeeper server port to listen on. By default it listens on 2181.
+ *
+ * @param zkPort
+ * zookeeper server port.
+ * @return builder.
+ */
+ public Builder zkPort(int zkPort) {
+ this.zkPort = zkPort;
+ return this;
+ }
+
+ /**
+ * Whether to start proxy or not. By default is true.
+ *
+ * @param startProxy
+ * whether to start proxy or not.
+ * @return builder
+ */
+ public Builder shouldStartProxy(boolean startProxy) {
+ this.shouldStartProxy = startProxy;
+ return this;
+ }
+
+ /**
+ * Port that proxy server to listen on. By default is 7000.
+ *
+ * @param proxyPort
+ * port that proxy server to listen on.
+ * @return builder
+ */
+ public Builder proxyPort(int proxyPort) {
+ this.proxyPort = proxyPort;
+ return this;
+ }
+
+ /**
+ * Set the distributedlog configuration.
+ *
+ * @param dlConf
+ * distributedlog configuration
+ * @return builder
+ */
+ public Builder dlConf(DistributedLogConfiguration dlConf) {
+ this.dlConf = dlConf;
+ return this;
+ }
+
+ /**
+ * Set the Bookkeeper server configuration.
+ *
+ * @param bkConf
+ * bookkeeper server configuration
+ * @return builder
+ */
+ public Builder bkConf(ServerConfiguration bkConf) {
+ this.bkConf = bkConf;
+ return this;
+ }
+
+ /**
+ * Enable thriftmux for the dl server.
+ *
+ * @param enabled flag to enable thriftmux
+ * @return builder
+ */
+ public Builder thriftmux(boolean enabled) {
+ this.thriftmux = enabled;
+ return this;
+ }
+
+ public DistributedLogCluster build() throws Exception {
+ // build the cluster
+ return new DistributedLogCluster(
+ dlConf,
+ bkConf,
+ numBookies,
+ shouldStartZK,
+ zkHost,
+ zkPort,
+ shouldStartProxy,
+ proxyPort,
+ thriftmux);
+ }
+ }
+
+ /**
+ * Run a distributedlog proxy server.
+ */
+ public static class DLServer {
+
+ static final int MAX_RETRIES = 20;
+ static final int MIN_PORT = 1025;
+ static final int MAX_PORT = 65535;
+
+ int proxyPort;
+
+ public final InetSocketAddress address;
+ public final Pair<DistributedLogServiceImpl, Server> dlServer;
+ private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);
+
+ protected DLServer(DistributedLogConfiguration dlConf,
+ URI uri,
+ int basePort,
+ boolean thriftmux) throws Exception {
+ proxyPort = basePort;
+
+ boolean success = false;
+ int retries = 0;
+ Pair<DistributedLogServiceImpl, Server> serverPair = null;
+ while (!success) {
+ try {
+ org.apache.distributedlog.service.config.ServerConfiguration serverConf =
+ new org.apache.distributedlog.service.config.ServerConfiguration();
+ serverConf.loadConf(dlConf);
+ serverConf.setServerShardId(proxyPort);
+ serverPair = DistributedLogServer.runServer(
+ serverConf,
+ dlConf,
+ uri,
+ new IdentityStreamPartitionConverter(),
+ routingService,
+ new NullStatsProvider(),
+ proxyPort,
+ thriftmux,
+ new EqualLoadAppraiser());
+ routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
+ routingService.startService();
+ serverPair.getLeft().startPlacementPolicy();
+ success = true;
+ } catch (BindException be) {
+ retries++;
+ if (retries > MAX_RETRIES) {
+ throw be;
+ }
+ proxyPort++;
+ if (proxyPort > MAX_PORT) {
+ proxyPort = MIN_PORT;
+ }
+ }
+ }
+
+ LOG.info("Running DL on port {}", proxyPort);
+
+ dlServer = serverPair;
+ address = DLSocketAddress.getSocketAddress(proxyPort);
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public void shutdown() {
+ DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
+ routingService.stopService();
+ }
+ }
+
+ private final DistributedLogConfiguration dlConf;
+ private final ZooKeeperServerShim zks;
+ private final LocalDLMEmulator dlmEmulator;
+ private DLServer dlServer;
+ private final boolean shouldStartProxy;
+ private final int proxyPort;
+ private final boolean thriftmux;
+ private final List<File> tmpDirs = new ArrayList<File>();
+
+ private DistributedLogCluster(DistributedLogConfiguration dlConf,
+ ServerConfiguration bkConf,
+ int numBookies,
+ boolean shouldStartZK,
+ String zkServers,
+ int zkPort,
+ boolean shouldStartProxy,
+ int proxyPort,
+ boolean thriftmux) throws Exception {
+ this.dlConf = dlConf;
+ if (shouldStartZK) {
+ File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
+ tmpDirs.add(zkTmpDir);
+ if (0 == zkPort) {
+ Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
+ this.zks = serverAndPort.getLeft();
+ zkPort = serverAndPort.getRight();
+ } else {
+ this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
+ }
+ } else {
+ this.zks = null;
+ }
+ this.dlmEmulator = LocalDLMEmulator.newBuilder()
+ .numBookies(numBookies)
+ .zkHost(zkServers)
+ .zkPort(zkPort)
+ .serverConf(bkConf)
+ .shouldStartZK(false)
+ .build();
+ this.shouldStartProxy = shouldStartProxy;
+ this.proxyPort = proxyPort;
+ this.thriftmux = thriftmux;
+ }
+
+ public void start() throws Exception {
+ this.dlmEmulator.start();
+ BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
+ DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
+ if (shouldStartProxy) {
+ this.dlServer = new DLServer(
+ dlConf,
+ this.dlmEmulator.getUri(),
+ proxyPort,
+ thriftmux);
+ } else {
+ this.dlServer = null;
+ }
+ }
+
+ public void stop() throws Exception {
+ if (null != dlServer) {
+ this.dlServer.shutdown();
+ }
+ this.dlmEmulator.teardown();
+ if (null != this.zks) {
+ this.zks.stop();
+ }
+ for (File dir : tmpDirs) {
+ FileUtils.forceDeleteOnExit(dir);
+ }
+ }
+
+ public URI getUri() {
+ return this.dlmEmulator.getUri();
+ }
+
+ public String getZkServers() {
+ return this.dlmEmulator.getZkServers();
+ }
+
+ public String getProxyFinagleStr() {
+ return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
new file mode 100644
index 0000000..81e476b
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.config.DynamicConfigurationFactory;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.service.announcer.Announcer;
+import org.apache.distributedlog.service.announcer.NOPAnnouncer;
+import org.apache.distributedlog.service.announcer.ServerSetAnnouncer;
+import org.apache.distributedlog.service.config.DefaultStreamConfigProvider;
+import org.apache.distributedlog.service.config.NullStreamConfigProvider;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.ServiceStreamConfigProvider;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
+import org.apache.distributedlog.service.placement.LoadAppraiser;
+import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.Stack;
+import com.twitter.finagle.ThriftMuxServer$;
+import com.twitter.finagle.builder.Server;
+import com.twitter.finagle.builder.ServerBuilder;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientIdRequiredFilter;
+import com.twitter.finagle.thrift.ThriftServerFramedCodec;
+import com.twitter.finagle.transport.Transport;
+import com.twitter.util.Duration;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+
+/**
+ * Running the distributedlog proxy server.
+ */
+public class DistributedLogServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+ private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
+
+ private DistributedLogServiceImpl dlService = null;
+ private Server server = null;
+ private RoutingService routingService;
+ private StatsProvider statsProvider;
+ private Announcer announcer = null;
+ private ScheduledExecutorService configExecutorService;
+ private long gracefulShutdownMs = 0L;
+
+ private final StatsReceiver statsReceiver;
+ private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
+ private final Optional<String> uri;
+ private final Optional<String> conf;
+ private final Optional<String> streamConf;
+ private final Optional<Integer> port;
+ private final Optional<Integer> statsPort;
+ private final Optional<Integer> shardId;
+ private final Optional<Boolean> announceServerSet;
+ private final Optional<String> loadAppraiserClassStr;
+ private final Optional<Boolean> thriftmux;
+
+ DistributedLogServer(Optional<String> uri,
+ Optional<String> conf,
+ Optional<String> streamConf,
+ Optional<Integer> port,
+ Optional<Integer> statsPort,
+ Optional<Integer> shardId,
+ Optional<Boolean> announceServerSet,
+ Optional<String> loadAppraiserClass,
+ Optional<Boolean> thriftmux,
+ RoutingService routingService,
+ StatsReceiver statsReceiver,
+ StatsProvider statsProvider) {
+ this.uri = uri;
+ this.conf = conf;
+ this.streamConf = streamConf;
+ this.port = port;
+ this.statsPort = statsPort;
+ this.shardId = shardId;
+ this.announceServerSet = announceServerSet;
+ this.thriftmux = thriftmux;
+ this.routingService = routingService;
+ this.statsReceiver = statsReceiver;
+ this.statsProvider = statsProvider;
+ this.loadAppraiserClassStr = loadAppraiserClass;
+ }
+
+ public void runServer()
+ throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+ if (!uri.isPresent()) {
+ throw new IllegalArgumentException("No distributedlog uri provided.");
+ }
+ URI dlUri = URI.create(uri.get());
+ DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+ if (conf.isPresent()) {
+ String configFile = conf.get();
+ try {
+ dlConf.loadConf(new File(configFile).toURI().toURL());
+ } catch (ConfigurationException e) {
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ + configFile + ".");
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ + configFile + ".");
+ }
+ }
+
+ this.configExecutorService = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("DistributedLogService-Dyncfg-%d")
+ .setDaemon(true)
+ .build());
+
+ // server configuration and dynamic configuration
+ ServerConfiguration serverConf = new ServerConfiguration();
+ serverConf.loadConf(dlConf);
+
+ // overwrite the shard id if it is provided in the args
+ if (shardId.isPresent()) {
+ serverConf.setServerShardId(shardId.get());
+ }
+
+ serverConf.validate();
+
+ DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf);
+
+ logger.info("Starting stats provider : {}", statsProvider.getClass());
+ statsProvider.start(dlConf);
+
+ if (announceServerSet.isPresent() && announceServerSet.get()) {
+ announcer = new ServerSetAnnouncer(
+ dlUri,
+ port.or(0),
+ statsPort.or(0),
+ shardId.or(0));
+ } else {
+ announcer = new NOPAnnouncer();
+ }
+
+ // Build the stream partition converter
+ StreamPartitionConverter converter;
+ try {
+ converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
+ } catch (ConfigurationException e) {
+ logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
+ IdentityStreamPartitionConverter.class.getName());
+ converter = new IdentityStreamPartitionConverter();
+ }
+ Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
+ LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
+ logger.info("Load appraiser class is " + loadAppraiserClassStr.or("not specified.") + " Instantiated "
+ + loadAppraiser.getClass().getCanonicalName());
+
+ StreamConfigProvider streamConfProvider =
+ getStreamConfigProvider(dlConf, converter);
+
+ // pre-run
+ preRun(dlConf, serverConf);
+
+ Pair<DistributedLogServiceImpl, Server> serverPair = runServer(
+ serverConf,
+ dlConf,
+ dynDlConf,
+ dlUri,
+ converter,
+ routingService,
+ statsProvider,
+ port.or(0),
+ keepAliveLatch,
+ statsReceiver,
+ thriftmux.isPresent(),
+ streamConfProvider,
+ loadAppraiser);
+
+ this.dlService = serverPair.getLeft();
+ this.server = serverPair.getRight();
+
+ // announce the service
+ announcer.announce();
+ // start the routing service after announced
+ routingService.startService();
+ logger.info("Started the routing service.");
+ dlService.startPlacementPolicy();
+ logger.info("Started the placement policy.");
+ }
+
+ protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
+ this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs();
+ if (!serverConf.isDurableWriteEnabled()) {
+ conf.setDurableWriteEnabled(false);
+ }
+ }
+
+ private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
+ throws ConfigurationException {
+ Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
+ if (conf.isPresent()) {
+ DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
+ configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
+ dynConf = configFactory.getDynamicConfiguration(conf.get());
+ }
+ if (dynConf.isPresent()) {
+ return dynConf.get();
+ } else {
+ return ConfUtils.getConstDynConf(dlConf);
+ }
+ }
+
+ private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf,
+ StreamPartitionConverter partitionConverter)
+ throws ConfigurationException {
+ StreamConfigProvider streamConfProvider = new NullStreamConfigProvider();
+ if (streamConf.isPresent() && conf.isPresent()) {
+ String dynConfigPath = streamConf.get();
+ String defaultConfigFile = conf.get();
+ streamConfProvider = new ServiceStreamConfigProvider(
+ dynConfigPath,
+ defaultConfigFile,
+ partitionConverter,
+ configExecutorService,
+ dlConf.getDynamicConfigReloadIntervalSec(),
+ TimeUnit.SECONDS);
+ } else if (conf.isPresent()) {
+ String configFile = conf.get();
+ streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService,
+ dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
+ }
+ return streamConfProvider;
+ }
+
+ static Pair<DistributedLogServiceImpl, Server> runServer(
+ ServerConfiguration serverConf,
+ DistributedLogConfiguration dlConf,
+ URI dlUri,
+ StreamPartitionConverter converter,
+ RoutingService routingService,
+ StatsProvider provider,
+ int port,
+ boolean thriftmux,
+ LoadAppraiser loadAppraiser) throws IOException {
+
+ return runServer(serverConf,
+ dlConf,
+ ConfUtils.getConstDynConf(dlConf),
+ dlUri,
+ converter,
+ routingService,
+ provider,
+ port,
+ new CountDownLatch(0),
+ new NullStatsReceiver(),
+ thriftmux,
+ new NullStreamConfigProvider(),
+ loadAppraiser);
+ }
+
+ static Pair<DistributedLogServiceImpl, Server> runServer(
+ ServerConfiguration serverConf,
+ DistributedLogConfiguration dlConf,
+ DynamicDistributedLogConfiguration dynDlConf,
+ URI dlUri,
+ StreamPartitionConverter partitionConverter,
+ RoutingService routingService,
+ StatsProvider provider,
+ int port,
+ CountDownLatch keepAliveLatch,
+ StatsReceiver statsReceiver,
+ boolean thriftmux,
+ StreamConfigProvider streamConfProvider,
+ LoadAppraiser loadAppraiser) throws IOException {
+ logger.info("Running server @ uri {}.", dlUri);
+
+ boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
+ StatsLogger perStreamStatsLogger;
+ if (perStreamStatsEnabled) {
+ perStreamStatsLogger = provider.getStatsLogger("stream");
+ } else {
+ perStreamStatsLogger = NullStatsLogger.INSTANCE;
+ }
+
+ // dl service
+ DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
+ serverConf,
+ dlConf,
+ dynDlConf,
+ streamConfProvider,
+ dlUri,
+ partitionConverter,
+ routingService,
+ provider.getStatsLogger(""),
+ perStreamStatsLogger,
+ keepAliveLatch,
+ loadAppraiser);
+
+ StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
+ StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
+
+ ServerBuilder serverBuilder = ServerBuilder.get()
+ .name("DistributedLogServer")
+ .codec(ThriftServerFramedCodec.get())
+ .reportTo(statsReceiver)
+ .keepAlive(true)
+ .bindTo(new InetSocketAddress(port));
+
+ if (thriftmux) {
+ logger.info("Using thriftmux.");
+ Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
+ Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
+ serverBuilder = serverBuilder.stack(
+ ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
+ }
+
+ logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
+
+ // starts dl server
+ Server server = ServerBuilder.safeBuild(
+ new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen(
+ new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen(
+ new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))),
+ serverBuilder);
+
+ logger.info("Started DistributedLog Server.");
+ return Pair.of(dlService, server);
+ }
+
+ static void closeServer(Pair<DistributedLogServiceImpl, Server> pair,
+ long gracefulShutdownPeriod,
+ TimeUnit timeUnit) {
+ if (null != pair.getLeft()) {
+ pair.getLeft().shutdown();
+ if (gracefulShutdownPeriod > 0) {
+ try {
+ timeUnit.sleep(gracefulShutdownPeriod);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e);
+ }
+ }
+ }
+ if (null != pair.getRight()) {
+ logger.info("Closing dl thrift server.");
+ pair.getRight().close();
+ logger.info("Closed dl thrift server.");
+ }
+ }
+
+ /**
+ * Close the server.
+ */
+ public void close() {
+ if (null != announcer) {
+ try {
+ announcer.unannounce();
+ } catch (IOException e) {
+ logger.warn("Error on unannouncing service : ", e);
+ }
+ announcer.close();
+ }
+ closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS);
+ routingService.stopService();
+ if (null != statsProvider) {
+ statsProvider.stop();
+ }
+ SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS);
+ keepAliveLatch.countDown();
+ }
+
+ public void join() throws InterruptedException {
+ keepAliveLatch.await();
+ }
+
+ /**
+ * Running distributedlog server.
+ *
+ * @param uri distributedlog namespace
+ * @param conf distributedlog configuration file location
+ * @param streamConf per stream configuration dir location
+ * @param port listen port
+ * @param statsPort stats port
+ * @param shardId shard id
+ * @param announceServerSet whether to announce itself to server set
+ * @param thriftmux flag to enable thrift mux
+ * @param statsReceiver receiver to receive finagle stats
+ * @param statsProvider provider to receive dl stats
+ * @return distributedlog server
+ * @throws ConfigurationException
+ * @throws IllegalArgumentException
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public static DistributedLogServer runServer(
+ Optional<String> uri,
+ Optional<String> conf,
+ Optional<String> streamConf,
+ Optional<Integer> port,
+ Optional<Integer> statsPort,
+ Optional<Integer> shardId,
+ Optional<Boolean> announceServerSet,
+ Optional<String> loadAppraiserClass,
+ Optional<Boolean> thriftmux,
+ RoutingService routingService,
+ StatsReceiver statsReceiver,
+ StatsProvider statsProvider)
+ throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+
+ final DistributedLogServer server = new DistributedLogServer(
+ uri,
+ conf,
+ streamConf,
+ port,
+ statsPort,
+ shardId,
+ announceServerSet,
+ loadAppraiserClass,
+ thriftmux,
+ routingService,
+ statsReceiver,
+ statsProvider);
+
+ server.runServer();
+ return server;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
new file mode 100644
index 0000000..a1642f9
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.client.routing.RoutingUtils;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The launcher of the distributedlog proxy server.
+ */
+public class DistributedLogServerApp {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
+
+ private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
+ private final String[] args;
+ private final Options options = new Options();
+
+ private DistributedLogServerApp(String[] args) {
+ this.args = args;
+
+ // prepare options
+ options.addOption("u", "uri", true, "DistributedLog URI");
+ options.addOption("c", "conf", true, "DistributedLog Configuration File");
+ options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory");
+ options.addOption("p", "port", true, "DistributedLog Server Port");
+ options.addOption("sp", "stats-port", true, "DistributedLog Stats Port");
+ options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
+ options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
+ options.addOption("a", "announce", false, "ServerSet Path to Announce");
+ options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
+ options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
+ }
+
+ private void printUsage() {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp(USAGE, options);
+ }
+
+ private void run() {
+ try {
+ logger.info("Running distributedlog server : args = {}", Arrays.toString(args));
+ BasicParser parser = new BasicParser();
+ CommandLine cmdline = parser.parse(options, args);
+ runCmd(cmdline);
+ } catch (ParseException pe) {
+ logger.error("Argument error : {}", pe.getMessage());
+ printUsage();
+ Runtime.getRuntime().exit(-1);
+ } catch (IllegalArgumentException iae) {
+ logger.error("Argument error : {}", iae.getMessage());
+ printUsage();
+ Runtime.getRuntime().exit(-1);
+ } catch (ConfigurationException ce) {
+ logger.error("Configuration error : {}", ce.getMessage());
+ printUsage();
+ Runtime.getRuntime().exit(-1);
+ } catch (IOException ie) {
+ logger.error("Failed to start distributedlog server : ", ie);
+ Runtime.getRuntime().exit(-1);
+ } catch (ClassNotFoundException cnf) {
+ logger.error("Failed to start distributedlog server : ", cnf);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+
+ private void runCmd(CommandLine cmdline)
+ throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
+ final StatsReceiver statsReceiver = NullStatsReceiver.get();
+ Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
+ DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+ if (confOptional.isPresent()) {
+ String configFile = confOptional.get();
+ try {
+ dlConf.loadConf(new File(configFile).toURI().toURL());
+ } catch (ConfigurationException e) {
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ + configFile + ".");
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ + configFile + ".");
+ }
+ }
+ // load the stats provider
+ final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd")
+ .transform(new Function<String, StatsProvider>() {
+ @Nullable
+ @Override
+ public StatsProvider apply(@Nullable String name) {
+ return ReflectionUtils.newInstance(name, StatsProvider.class);
+ }
+ }).or(new NullStatsProvider());
+
+ final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
+ checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
+ URI dlUri = URI.create(uriOption.get());
+
+ DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
+ RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet())
+ .statsReceiver(statsReceiver.scope("routing"))
+ .build();
+
+ final DistributedLogServer server = DistributedLogServer.runServer(
+ uriOption,
+ confOptional,
+ getOptionalStringArg(cmdline, "sc"),
+ getOptionalIntegerArg(cmdline, "p"),
+ getOptionalIntegerArg(cmdline, "sp"),
+ getOptionalIntegerArg(cmdline, "si"),
+ getOptionalBooleanArg(cmdline, "a"),
+ getOptionalStringArg(cmdline, "la"),
+ getOptionalBooleanArg(cmdline, "mx"),
+ routingService,
+ statsReceiver,
+ statsProvider);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ logger.info("Closing DistributedLog Server.");
+ server.close();
+ logger.info("Closed DistributedLog Server.");
+ statsProvider.stop();
+ }
+ });
+
+ try {
+ server.join();
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted when waiting distributedlog server to be finished : ", e);
+ }
+
+ logger.info("DistributedLog Service Interrupted.");
+ server.close();
+ logger.info("Closed DistributedLog Server.");
+ statsProvider.stop();
+ }
+
+ public static void main(String[] args) {
+ final DistributedLogServerApp launcher = new DistributedLogServerApp(args);
+ launcher.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
new file mode 100644
index 0000000..c37cd53
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
@@ -0,0 +1,794 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.net.InetSocketAddressHelper;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RegionUnavailableException;
+import org.apache.distributedlog.exceptions.ServiceUnavailableException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.TooManyStreamsException;
+import org.apache.distributedlog.feature.AbstractFeatureProvider;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.rate.MovingAverageRateFactory;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy;
+import org.apache.distributedlog.service.placement.LoadAppraiser;
+import org.apache.distributedlog.service.placement.PlacementPolicy;
+import org.apache.distributedlog.service.placement.ZKPlacementStateManager;
+import org.apache.distributedlog.service.stream.BulkWriteOp;
+import org.apache.distributedlog.service.stream.DeleteOp;
+import org.apache.distributedlog.service.stream.HeartbeatOp;
+import org.apache.distributedlog.service.stream.ReleaseOp;
+import org.apache.distributedlog.service.stream.Stream;
+import org.apache.distributedlog.service.stream.StreamFactory;
+import org.apache.distributedlog.service.stream.StreamFactoryImpl;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.distributedlog.service.stream.StreamOpStats;
+import org.apache.distributedlog.service.stream.TruncateOp;
+import org.apache.distributedlog.service.stream.WriteOp;
+import org.apache.distributedlog.service.stream.WriteOpWithPayload;
+import org.apache.distributedlog.service.stream.admin.CreateOp;
+import org.apache.distributedlog.service.stream.admin.StreamAdminOp;
+import org.apache.distributedlog.service.stream.limiter.ServiceRequestLimiter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.service.utils.ServerUtils;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.ServerStatus;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Timer;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Implementation of distributedlog thrift service.
+ */
+public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
+ FatalErrorHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
+
+ private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
+
+ private final ServerConfiguration serverConfig;
+ private final DistributedLogConfiguration dlConfig;
+ private final DistributedLogNamespace dlNamespace;
+ private final int serverRegionId;
+ private final PlacementPolicy placementPolicy;
+ private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
+ private final ReentrantReadWriteLock closeLock =
+ new ReentrantReadWriteLock();
+ private final CountDownLatch keepAliveLatch;
+ private final byte dlsnVersion;
+ private final String clientId;
+ private final OrderedScheduler scheduler;
+ private final AccessControlManager accessControlManager;
+ private final StreamConfigProvider streamConfigProvider;
+ private final StreamManager streamManager;
+ private final StreamFactory streamFactory;
+ private final RoutingService routingService;
+ private final RegionResolver regionResolver;
+ private final MovingAverageRateFactory movingAvgFactory;
+ private final MovingAverageRate windowedRps;
+ private final MovingAverageRate windowedBps;
+ private final ServiceRequestLimiter limiter;
+ private final Timer timer;
+ private final HashedWheelTimer requestTimer;
+
+ // Features
+ private final FeatureProvider featureProvider;
+ private final Feature featureRegionStopAcceptNewStream;
+ private final Feature featureChecksumDisabled;
+ private final Feature limiterDisabledFeature;
+
+ // Stats
+ private final StatsLogger statsLogger;
+ private final StatsLogger perStreamStatsLogger;
+ private final StreamPartitionConverter streamPartitionConverter;
+ private final StreamOpStats streamOpStats;
+ private final Counter bulkWritePendingStat;
+ private final Counter writePendingStat;
+ private final Counter redirects;
+ private final Counter receivedRecordCounter;
+ private final StatsLogger statusCodeStatLogger;
+ private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters =
+ new ConcurrentHashMap<StatusCode, Counter>();
+ private final Counter statusCodeTotal;
+ private final Gauge<Number> proxyStatusGauge;
+ private final Gauge<Number> movingAvgRpsGauge;
+ private final Gauge<Number> movingAvgBpsGauge;
+ private final Gauge<Number> streamAcquiredGauge;
+ private final Gauge<Number> streamCachedGauge;
+ private final int shard;
+
+ DistributedLogServiceImpl(ServerConfiguration serverConf,
+ DistributedLogConfiguration dlConf,
+ DynamicDistributedLogConfiguration dynDlConf,
+ StreamConfigProvider streamConfigProvider,
+ URI uri,
+ StreamPartitionConverter converter,
+ RoutingService routingService,
+ StatsLogger statsLogger,
+ StatsLogger perStreamStatsLogger,
+ CountDownLatch keepAliveLatch,
+ LoadAppraiser loadAppraiser)
+ throws IOException {
+ // Configuration.
+ this.serverConfig = serverConf;
+ this.dlConfig = dlConf;
+ this.perStreamStatsLogger = perStreamStatsLogger;
+ this.dlsnVersion = serverConf.getDlsnVersion();
+ this.serverRegionId = serverConf.getRegionId();
+ this.streamPartitionConverter = converter;
+ int serverPort = serverConf.getServerPort();
+ this.shard = serverConf.getServerShardId();
+ int numThreads = serverConf.getServerThreads();
+ this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
+ String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
+ serverRegionId,
+ shard,
+ serverConf.isUseHostnameAsAllocatorPoolName());
+ dlConf.setLedgerAllocatorPoolName(allocatorPoolName);
+ this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", dlConf, statsLogger.scope("features"));
+ if (this.featureProvider instanceof AbstractFeatureProvider) {
+ ((AbstractFeatureProvider) featureProvider).start();
+ }
+
+ // Build the namespace
+ this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+ .conf(dlConf)
+ .uri(uri)
+ .statsLogger(statsLogger)
+ .featureProvider(this.featureProvider)
+ .clientId(clientId)
+ .regionId(serverRegionId)
+ .build();
+ this.accessControlManager = this.dlNamespace.createAccessControlManager();
+ this.keepAliveLatch = keepAliveLatch;
+ this.streamConfigProvider = streamConfigProvider;
+
+ // Stats pertaining to stream op execution
+ this.streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+
+ // Executor Service.
+ this.scheduler = OrderedScheduler.newBuilder()
+ .corePoolSize(numThreads)
+ .name("DistributedLogService-Executor")
+ .traceTaskExecution(true)
+ .statsLogger(statsLogger.scope("scheduler"))
+ .build();
+
+ // Timer, kept separate to ensure reliability of timeouts.
+ this.requestTimer = new HashedWheelTimer(
+ new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(),
+ dlConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+ dlConf.getTimeoutTimerNumTicks());
+
+ // Creating and managing Streams
+ this.streamFactory = new StreamFactoryImpl(clientId,
+ streamOpStats,
+ serverConf,
+ dlConf,
+ featureProvider,
+ streamConfigProvider,
+ converter,
+ dlNamespace,
+ scheduler,
+ this,
+ requestTimer);
+ this.streamManager = new StreamManagerImpl(
+ clientId,
+ dlConf,
+ scheduler,
+ streamFactory,
+ converter,
+ streamConfigProvider,
+ dlNamespace);
+ this.routingService = routingService;
+ this.regionResolver = new DefaultRegionResolver();
+
+ // Service features
+ this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(
+ ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase());
+ this.featureChecksumDisabled = this.featureProvider.getFeature(
+ ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase());
+ this.limiterDisabledFeature = this.featureProvider.getFeature(
+ ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase());
+
+ // Resource limiting
+ this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
+ this.movingAvgFactory = new MovingAverageRateFactory(timer);
+ this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
+ this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
+ this.limiter = new ServiceRequestLimiter(
+ dynDlConf,
+ streamOpStats.baseScope("service_limiter"),
+ windowedRps,
+ windowedBps,
+ streamManager,
+ limiterDisabledFeature);
+
+ this.placementPolicy = new LeastLoadPlacementPolicy(
+ loadAppraiser,
+ routingService,
+ dlNamespace,
+ new ZKPlacementStateManager(uri, dlConf, statsLogger),
+ Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
+ statsLogger);
+ logger.info("placement started");
+
+ // Stats
+ this.statsLogger = statsLogger;
+
+ // Gauges for server status/health
+ this.proxyStatusGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
+ ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
+ }
+ };
+ this.movingAvgRpsGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return windowedRps.get();
+ }
+ };
+ this.movingAvgBpsGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return windowedBps.get();
+ }
+ };
+ // Gauges for streams
+ this.streamAcquiredGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return streamManager.numAcquired();
+ }
+ };
+ this.streamCachedGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return streamManager.numCached();
+ }
+ };
+
+ // Stats on server
+ statsLogger.registerGauge("proxy_status", proxyStatusGauge);
+ // Global moving average rps
+ statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge);
+ // Global moving average bps
+ statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge);
+ // Stats on requests
+ this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending");
+ this.writePendingStat = streamOpStats.requestPendingCounter("writePending");
+ this.redirects = streamOpStats.requestCounter("redirect");
+ this.statusCodeStatLogger = streamOpStats.requestScope("statuscode");
+ this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count");
+ this.receivedRecordCounter = streamOpStats.recordsCounter("received");
+
+ // Stats for streams
+ StatsLogger streamsStatsLogger = statsLogger.scope("streams");
+ streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge);
+ streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
+
+ // Setup complete
+ logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
+ + " dlsn version {}.",
+ new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
+ }
+
+ private void countStatusCode(StatusCode code) {
+ Counter counter = statusCodeCounters.get(code);
+ if (null == counter) {
+ counter = statusCodeStatLogger.getCounter(code.name());
+ Counter oldCounter = statusCodeCounters.putIfAbsent(code, counter);
+ if (null != oldCounter) {
+ counter = oldCounter;
+ }
+ }
+ counter.inc();
+ statusCodeTotal.inc();
+ }
+
+ @Override
+ public Future<ServerInfo> handshake() {
+ return handshakeWithClientInfo(new ClientInfo());
+ }
+
+ @Override
+ public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+ ServerInfo serverInfo = new ServerInfo();
+ closeLock.readLock().lock();
+ try {
+ serverInfo.setServerStatus(serverStatus);
+ } finally {
+ closeLock.readLock().unlock();
+ }
+
+ if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) {
+ return Future.value(serverInfo);
+ }
+
+ Optional<String> regex = Optional.absent();
+ if (clientInfo.isSetStreamNameRegex()) {
+ regex = Optional.of(clientInfo.getStreamNameRegex());
+ }
+
+ Map<String, String> ownershipMap = streamManager.getStreamOwnershipMap(regex);
+ serverInfo.setOwnerships(ownershipMap);
+ return Future.value(serverInfo);
+ }
+
+ @VisibleForTesting
+ Stream getLogWriter(String stream) throws IOException {
+ Stream writer = streamManager.getStream(stream);
+ if (null == writer) {
+ closeLock.readLock().lock();
+ try {
+ if (featureRegionStopAcceptNewStream.isAvailable()) {
+ // accept new stream is disabled in current dc
+ throw new RegionUnavailableException("Region is unavailable right now.");
+ } else if (!(ServerStatus.WRITE_AND_ACCEPT == serverStatus)) {
+ // if it is closed, we would not acquire stream again.
+ return null;
+ }
+ writer = streamManager.getOrCreateStream(stream, true);
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+ return writer;
+ }
+
+ // Service interface methods
+
+ @Override
+ public Future<WriteResponse> write(final String stream, ByteBuffer data) {
+ receivedRecordCounter.inc();
+ return doWrite(stream, data, null /* checksum */, false);
+ }
+
+ @Override
+ public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
+ List<ByteBuffer> data,
+ WriteContext ctx) {
+ bulkWritePendingStat.inc();
+ receivedRecordCounter.add(data.size());
+ BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+ getChecksum(ctx), featureChecksumDisabled, accessControlManager);
+ executeStreamOp(op);
+ return op.result().ensure(new Function0<BoxedUnit>() {
+ public BoxedUnit apply() {
+ bulkWritePendingStat.dec();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public Future<WriteResponse> writeWithContext(final String stream, ByteBuffer data, WriteContext ctx) {
+ return doWrite(stream, data, getChecksum(ctx), ctx.isIsRecordSet());
+ }
+
+ @Override
+ public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
+ HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
+ featureChecksumDisabled, accessControlManager);
+ executeStreamOp(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<WriteResponse> heartbeatWithOptions(String stream, WriteContext ctx, HeartbeatOptions options) {
+ HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
+ featureChecksumDisabled, accessControlManager);
+ if (options.isSendHeartBeatToReader()) {
+ op.setWriteControlRecord(true);
+ }
+ executeStreamOp(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
+ TruncateOp op = new TruncateOp(
+ stream,
+ DLSN.deserialize(dlsn),
+ statsLogger,
+ perStreamStatsLogger,
+ getChecksum(ctx),
+ featureChecksumDisabled,
+ accessControlManager);
+ executeStreamOp(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<WriteResponse> delete(String stream, WriteContext ctx) {
+ DeleteOp op = new DeleteOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
+ featureChecksumDisabled, accessControlManager);
+ executeStreamOp(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<WriteResponse> release(String stream, WriteContext ctx) {
+ ReleaseOp op = new ReleaseOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
+ featureChecksumDisabled, accessControlManager);
+ executeStreamOp(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<WriteResponse> create(String stream, WriteContext ctx) {
+ CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled);
+ return executeStreamAdminOp(op);
+ }
+
+ //
+ // Ownership RPC
+ //
+
+ @Override
+ public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) {
+ if (streamManager.isAcquired(streamName)) {
+ // the stream is already acquired
+ return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
+ }
+
+ return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
+ @Override
+ public WriteResponse apply(String server) {
+ String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
+ return new WriteResponse(ResponseUtils.ownerToHeader(host));
+ }
+ });
+ }
+
+
+ //
+ // Admin RPCs
+ //
+
+ @Override
+ public Future<Void> setAcceptNewStream(boolean enabled) {
+ closeLock.writeLock().lock();
+ try {
+ logger.info("Set AcceptNewStream = {}", enabled);
+ if (ServerStatus.DOWN != serverStatus) {
+ if (enabled) {
+ serverStatus = ServerStatus.WRITE_AND_ACCEPT;
+ } else {
+ serverStatus = ServerStatus.WRITE_ONLY;
+ }
+ }
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+ return Future.Void();
+ }
+
+ private Future<WriteResponse> doWrite(final String name,
+ ByteBuffer data,
+ Long checksum,
+ boolean isRecordSet) {
+ writePendingStat.inc();
+ receivedRecordCounter.inc();
+ WriteOp op = newWriteOp(name, data, checksum, isRecordSet);
+ executeStreamOp(op);
+ return op.result().ensure(new Function0<BoxedUnit>() {
+ public BoxedUnit apply() {
+ writePendingStat.dec();
+ return null;
+ }
+ });
+ }
+
+ private Long getChecksum(WriteContext ctx) {
+ return ctx.isSetCrc32() ? ctx.getCrc32() : null;
+ }
+
+ private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) {
+ try {
+ op.preExecute();
+ } catch (DLException dle) {
+ return Future.exception(dle);
+ }
+ return op.execute();
+ }
+
+ private void executeStreamOp(final StreamOp op) {
+
+ // Must attach this as early as possible--returning before this point will cause us to
+ // lose the status code.
+ op.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() {
+ @Override
+ public void onSuccess(ResponseHeader header) {
+ if (header.getLocation() != null || header.getCode() == StatusCode.FOUND) {
+ redirects.inc();
+ }
+ countStatusCode(header.getCode());
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ }
+ });
+
+ try {
+ // Apply the request limiter
+ limiter.apply(op);
+
+ // Execute per-op pre-exec code
+ op.preExecute();
+
+ } catch (TooManyStreamsException e) {
+ // Translate to StreamUnavailableException to ensure that the client will redirect
+ // to a different host. Ideally we would be able to return TooManyStreamsException,
+ // but the way exception handling works right now we can't control the handling in
+ // the client because client changes deploy very slowly.
+ op.fail(new StreamUnavailableException(e.getMessage()));
+ return;
+ } catch (Exception e) {
+ op.fail(e);
+ return;
+ }
+
+ Stream stream;
+ try {
+ stream = getLogWriter(op.streamName());
+ } catch (RegionUnavailableException rue) {
+ // redirect the requests to other region
+ op.fail(new RegionUnavailableException("Region " + serverRegionId + " is unavailable."));
+ return;
+ } catch (IOException e) {
+ op.fail(e);
+ return;
+ }
+ if (null == stream) {
+ // redirect the requests when stream is unavailable.
+ op.fail(new ServiceUnavailableException("Server " + clientId + " is closed."));
+ return;
+ }
+
+ if (op instanceof WriteOpWithPayload) {
+ WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
+ windowedBps.add(writeOp.getPayloadSize());
+ windowedRps.inc();
+ }
+
+ stream.submit(op);
+ }
+
+ void shutdown() {
+ try {
+ closeLock.writeLock().lock();
+ try {
+ if (ServerStatus.DOWN == serverStatus) {
+ return;
+ }
+ serverStatus = ServerStatus.DOWN;
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+
+ streamManager.close();
+ movingAvgFactory.close();
+ limiter.close();
+
+ Stopwatch closeStreamsStopwatch = Stopwatch.createStarted();
+
+ Future<List<Void>> closeResult = streamManager.closeStreams();
+ logger.info("Waiting for closing all streams ...");
+ try {
+ Await.result(closeResult, Duration.fromTimeUnit(5, TimeUnit.MINUTES));
+ logger.info("Closed all streams in {} millis.",
+ closeStreamsStopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted on waiting for closing all streams : ", e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e);
+ }
+
+ // shutdown the dl namespace
+ logger.info("Closing distributedlog namespace ...");
+ dlNamespace.close();
+ logger.info("Closed distributedlog namespace .");
+
+ // Stop the feature provider
+ if (this.featureProvider instanceof AbstractFeatureProvider) {
+ ((AbstractFeatureProvider) featureProvider).stop();
+ }
+
+ // Stop the timer.
+ timer.stop();
+ placementPolicy.close();
+
+ // clean up gauge
+ unregisterGauge();
+
+ // shutdown the executor after requesting closing streams.
+ SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ logger.info("Exception while shutting down distributedlog service.");
+ } finally {
+ // release the keepAliveLatch in case shutdown is called from a shutdown hook.
+ keepAliveLatch.countDown();
+ logger.info("Finished shutting down distributedlog service.");
+ }
+ }
+
+ protected void startPlacementPolicy() {
+ this.placementPolicy.start(shard == 0);
+ }
+
+ @Override
+ public void notifyFatalError() {
+ triggerShutdown();
+ }
+
+ private void triggerShutdown() {
+ // release the keepAliveLatch to let the main thread shutdown the whole service.
+ logger.info("Releasing KeepAlive Latch to trigger shutdown ...");
+ keepAliveLatch.countDown();
+ logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
+ }
+
+ // Test methods.
+
+ private DynamicDistributedLogConfiguration getDynConf(String streamName) {
+ Optional<DynamicDistributedLogConfiguration> dynDlConf =
+ streamConfigProvider.getDynamicStreamConfig(streamName);
+ if (dynDlConf.isPresent()) {
+ return dynDlConf.get();
+ } else {
+ return ConfUtils.getConstDynConf(dlConfig);
+ }
+ }
+
+ /**
+ * clean up the gauge before we close to help GC.
+ */
+ private void unregisterGauge(){
+ this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
+ this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
+ this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
+ this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
+ this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
+ }
+
+ @VisibleForTesting
+ Stream newStream(String name) throws IOException {
+ return streamManager.getOrCreateStream(name, false);
+ }
+
+ @VisibleForTesting
+ WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum) {
+ return newWriteOp(stream, data, checksum, false);
+ }
+
+ @VisibleForTesting
+ RoutingService getRoutingService() {
+ return this.routingService;
+ }
+
+ @VisibleForTesting
+ DLSocketAddress getServiceAddress() throws IOException {
+ return DLSocketAddress.deserialize(clientId);
+ }
+
+ WriteOp newWriteOp(String stream,
+ ByteBuffer data,
+ Long checksum,
+ boolean isRecordSet) {
+ return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+ serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
+ accessControlManager);
+ }
+
+ @VisibleForTesting
+ Future<List<Void>> closeStreams() {
+ return streamManager.closeStreams();
+ }
+
+ @VisibleForTesting
+ public DistributedLogNamespace getDistributedLogNamespace() {
+ return dlNamespace;
+ }
+
+ @VisibleForTesting
+ StreamManager getStreamManager() {
+ return streamManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
new file mode 100644
index 0000000..17b5ab3
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+/**
+ * Implement handling for an unrecoverable error.
+ */
+public interface FatalErrorHandler {
+
+ /**
+ * This method is invoked when an unrecoverable error has occurred
+ * and no progress can be made. It should implement a shutdown routine.
+ */
+ void notifyFatalError();
+}