You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:16 UTC
[09/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
deleted file mode 100644
index 267f75a..0000000
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stats;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.health.HealthCheckRegistry;
-import com.codahale.metrics.servlets.AdminServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-/**
- * Starts a jetty server on a configurable port to export stats.
- */
-public class ServletReporter {
-
- private final MetricRegistry metricRegistry;
- private final HealthCheckRegistry healthCheckRegistry;
- private final int port;
- private final Server jettyServer;
-
- public ServletReporter(MetricRegistry metricRegistry,
- HealthCheckRegistry healthCheckRegistry,
- int port) {
- this.metricRegistry = metricRegistry;
- this.healthCheckRegistry = healthCheckRegistry;
- this.port = port;
- this.jettyServer = new Server(port);
- }
-
- public void start() throws Exception {
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- jettyServer.setHandler(context);
-
- context.addEventListener(new HealthCheckServletContextListener(healthCheckRegistry));
- context.addEventListener(new MetricsServletContextListener(metricRegistry));
- context.addServlet(new ServletHolder(new AdminServlet()), "/*");
-
- jettyServer.start();
- }
-
- public void stop() throws Exception {
- jettyServer.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
deleted file mode 100644
index 5bdb3ce..0000000
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}.
- */
-package org.apache.bookkeeper.stats;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
deleted file mode 100644
index 96bc338..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * DistributedLog Client Related Utils.
- */
-public class ClientUtils {
-
- public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {
- DistributedLogClientImpl clientImpl = builder.buildClient();
- return Pair.of((DistributedLogClient) clientImpl, (MonitorServiceClient) clientImpl);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
deleted file mode 100644
index 9cc085d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.distributedlog.client.routing.SingleHostRoutingService;
-import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.metadata.DLMetadata;
-import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import com.twitter.finagle.builder.Server;
-import java.io.File;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DistributedLog Cluster is an emulator to run distributedlog components.
- */
-public class DistributedLogCluster {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Builder to build distributedlog cluster.
- */
- public static class Builder {
-
- int numBookies = 3;
- boolean shouldStartZK = true;
- String zkHost = "127.0.0.1";
- int zkPort = 0;
- boolean shouldStartProxy = true;
- int proxyPort = 7000;
- boolean thriftmux = false;
- DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
- .setLockTimeout(10)
- .setOutputBufferSize(0)
- .setImmediateFlushEnabled(true);
- ServerConfiguration bkConf = new ServerConfiguration();
-
- private Builder() {}
-
- /**
- * How many bookies to run. By default is 3.
- *
- * @return builder
- */
- public Builder numBookies(int numBookies) {
- this.numBookies = numBookies;
- return this;
- }
-
- /**
- * Whether to start zookeeper? By default is true.
- *
- * @param startZK
- * flag to start zookeeper?
- * @return builder
- */
- public Builder shouldStartZK(boolean startZK) {
- this.shouldStartZK = startZK;
- return this;
- }
-
- /**
- * ZooKeeper server to run. By default it runs locally on '127.0.0.1'.
- *
- * @param zkServers
- * zk servers
- * @return builder
- */
- public Builder zkServers(String zkServers) {
- this.zkHost = zkServers;
- return this;
- }
-
- /**
- * ZooKeeper server port to listen on. By default it listens on 2181.
- *
- * @param zkPort
- * zookeeper server port.
- * @return builder.
- */
- public Builder zkPort(int zkPort) {
- this.zkPort = zkPort;
- return this;
- }
-
- /**
- * Whether to start proxy or not. By default is true.
- *
- * @param startProxy
- * whether to start proxy or not.
- * @return builder
- */
- public Builder shouldStartProxy(boolean startProxy) {
- this.shouldStartProxy = startProxy;
- return this;
- }
-
- /**
- * Port that proxy server to listen on. By default is 7000.
- *
- * @param proxyPort
- * port that proxy server to listen on.
- * @return builder
- */
- public Builder proxyPort(int proxyPort) {
- this.proxyPort = proxyPort;
- return this;
- }
-
- /**
- * Set the distributedlog configuration.
- *
- * @param dlConf
- * distributedlog configuration
- * @return builder
- */
- public Builder dlConf(DistributedLogConfiguration dlConf) {
- this.dlConf = dlConf;
- return this;
- }
-
- /**
- * Set the Bookkeeper server configuration.
- *
- * @param bkConf
- * bookkeeper server configuration
- * @return builder
- */
- public Builder bkConf(ServerConfiguration bkConf) {
- this.bkConf = bkConf;
- return this;
- }
-
- /**
- * Enable thriftmux for the dl server.
- *
- * @param enabled flag to enable thriftmux
- * @return builder
- */
- public Builder thriftmux(boolean enabled) {
- this.thriftmux = enabled;
- return this;
- }
-
- public DistributedLogCluster build() throws Exception {
- // build the cluster
- return new DistributedLogCluster(
- dlConf,
- bkConf,
- numBookies,
- shouldStartZK,
- zkHost,
- zkPort,
- shouldStartProxy,
- proxyPort,
- thriftmux);
- }
- }
-
- /**
- * Run a distributedlog proxy server.
- */
- public static class DLServer {
-
- static final int MAX_RETRIES = 20;
- static final int MIN_PORT = 1025;
- static final int MAX_PORT = 65535;
-
- int proxyPort;
-
- public final InetSocketAddress address;
- public final Pair<DistributedLogServiceImpl, Server> dlServer;
- private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);
-
- protected DLServer(DistributedLogConfiguration dlConf,
- URI uri,
- int basePort,
- boolean thriftmux) throws Exception {
- proxyPort = basePort;
-
- boolean success = false;
- int retries = 0;
- Pair<DistributedLogServiceImpl, Server> serverPair = null;
- while (!success) {
- try {
- org.apache.distributedlog.service.config.ServerConfiguration serverConf =
- new org.apache.distributedlog.service.config.ServerConfiguration();
- serverConf.loadConf(dlConf);
- serverConf.setServerShardId(proxyPort);
- serverPair = DistributedLogServer.runServer(
- serverConf,
- dlConf,
- uri,
- new IdentityStreamPartitionConverter(),
- routingService,
- new NullStatsProvider(),
- proxyPort,
- thriftmux,
- new EqualLoadAppraiser());
- routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
- routingService.startService();
- serverPair.getLeft().startPlacementPolicy();
- success = true;
- } catch (BindException be) {
- retries++;
- if (retries > MAX_RETRIES) {
- throw be;
- }
- proxyPort++;
- if (proxyPort > MAX_PORT) {
- proxyPort = MIN_PORT;
- }
- }
- }
-
- LOG.info("Running DL on port {}", proxyPort);
-
- dlServer = serverPair;
- address = DLSocketAddress.getSocketAddress(proxyPort);
- }
-
- public InetSocketAddress getAddress() {
- return address;
- }
-
- public void shutdown() {
- DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
- routingService.stopService();
- }
- }
-
- private final DistributedLogConfiguration dlConf;
- private final ZooKeeperServerShim zks;
- private final LocalDLMEmulator dlmEmulator;
- private DLServer dlServer;
- private final boolean shouldStartProxy;
- private final int proxyPort;
- private final boolean thriftmux;
- private final List<File> tmpDirs = new ArrayList<File>();
-
- private DistributedLogCluster(DistributedLogConfiguration dlConf,
- ServerConfiguration bkConf,
- int numBookies,
- boolean shouldStartZK,
- String zkServers,
- int zkPort,
- boolean shouldStartProxy,
- int proxyPort,
- boolean thriftmux) throws Exception {
- this.dlConf = dlConf;
- if (shouldStartZK) {
- File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
- tmpDirs.add(zkTmpDir);
- if (0 == zkPort) {
- Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
- this.zks = serverAndPort.getLeft();
- zkPort = serverAndPort.getRight();
- } else {
- this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
- }
- } else {
- this.zks = null;
- }
- this.dlmEmulator = LocalDLMEmulator.newBuilder()
- .numBookies(numBookies)
- .zkHost(zkServers)
- .zkPort(zkPort)
- .serverConf(bkConf)
- .shouldStartZK(false)
- .build();
- this.shouldStartProxy = shouldStartProxy;
- this.proxyPort = proxyPort;
- this.thriftmux = thriftmux;
- }
-
- public void start() throws Exception {
- this.dlmEmulator.start();
- BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
- DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
- if (shouldStartProxy) {
- this.dlServer = new DLServer(
- dlConf,
- this.dlmEmulator.getUri(),
- proxyPort,
- thriftmux);
- } else {
- this.dlServer = null;
- }
- }
-
- public void stop() throws Exception {
- if (null != dlServer) {
- this.dlServer.shutdown();
- }
- this.dlmEmulator.teardown();
- if (null != this.zks) {
- this.zks.stop();
- }
- for (File dir : tmpDirs) {
- FileUtils.forceDeleteOnExit(dir);
- }
- }
-
- public URI getUri() {
- return this.dlmEmulator.getUri();
- }
-
- public String getZkServers() {
- return this.dlmEmulator.getZkServers();
- }
-
- public String getProxyFinagleStr() {
- return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
deleted file mode 100644
index 81e476b..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.config.DynamicConfigurationFactory;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.service.announcer.Announcer;
-import org.apache.distributedlog.service.announcer.NOPAnnouncer;
-import org.apache.distributedlog.service.announcer.ServerSetAnnouncer;
-import org.apache.distributedlog.service.config.DefaultStreamConfigProvider;
-import org.apache.distributedlog.service.config.NullStreamConfigProvider;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.ServiceStreamConfigProvider;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
-import org.apache.distributedlog.service.placement.LoadAppraiser;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.Stack;
-import com.twitter.finagle.ThriftMuxServer$;
-import com.twitter.finagle.builder.Server;
-import com.twitter.finagle.builder.ServerBuilder;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientIdRequiredFilter;
-import com.twitter.finagle.thrift.ThriftServerFramedCodec;
-import com.twitter.finagle.transport.Transport;
-import com.twitter.util.Duration;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-
-/**
- * Running the distributedlog proxy server.
- */
-public class DistributedLogServer {
-
- private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
- private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
-
- private DistributedLogServiceImpl dlService = null;
- private Server server = null;
- private RoutingService routingService;
- private StatsProvider statsProvider;
- private Announcer announcer = null;
- private ScheduledExecutorService configExecutorService;
- private long gracefulShutdownMs = 0L;
-
- private final StatsReceiver statsReceiver;
- private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
- private final Optional<String> uri;
- private final Optional<String> conf;
- private final Optional<String> streamConf;
- private final Optional<Integer> port;
- private final Optional<Integer> statsPort;
- private final Optional<Integer> shardId;
- private final Optional<Boolean> announceServerSet;
- private final Optional<String> loadAppraiserClassStr;
- private final Optional<Boolean> thriftmux;
-
- DistributedLogServer(Optional<String> uri,
- Optional<String> conf,
- Optional<String> streamConf,
- Optional<Integer> port,
- Optional<Integer> statsPort,
- Optional<Integer> shardId,
- Optional<Boolean> announceServerSet,
- Optional<String> loadAppraiserClass,
- Optional<Boolean> thriftmux,
- RoutingService routingService,
- StatsReceiver statsReceiver,
- StatsProvider statsProvider) {
- this.uri = uri;
- this.conf = conf;
- this.streamConf = streamConf;
- this.port = port;
- this.statsPort = statsPort;
- this.shardId = shardId;
- this.announceServerSet = announceServerSet;
- this.thriftmux = thriftmux;
- this.routingService = routingService;
- this.statsReceiver = statsReceiver;
- this.statsProvider = statsProvider;
- this.loadAppraiserClassStr = loadAppraiserClass;
- }
-
- public void runServer()
- throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
- if (!uri.isPresent()) {
- throw new IllegalArgumentException("No distributedlog uri provided.");
- }
- URI dlUri = URI.create(uri.get());
- DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
- if (conf.isPresent()) {
- String configFile = conf.get();
- try {
- dlConf.loadConf(new File(configFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from "
- + configFile + ".");
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
- + configFile + ".");
- }
- }
-
- this.configExecutorService = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder()
- .setNameFormat("DistributedLogService-Dyncfg-%d")
- .setDaemon(true)
- .build());
-
- // server configuration and dynamic configuration
- ServerConfiguration serverConf = new ServerConfiguration();
- serverConf.loadConf(dlConf);
-
- // overwrite the shard id if it is provided in the args
- if (shardId.isPresent()) {
- serverConf.setServerShardId(shardId.get());
- }
-
- serverConf.validate();
-
- DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf);
-
- logger.info("Starting stats provider : {}", statsProvider.getClass());
- statsProvider.start(dlConf);
-
- if (announceServerSet.isPresent() && announceServerSet.get()) {
- announcer = new ServerSetAnnouncer(
- dlUri,
- port.or(0),
- statsPort.or(0),
- shardId.or(0));
- } else {
- announcer = new NOPAnnouncer();
- }
-
- // Build the stream partition converter
- StreamPartitionConverter converter;
- try {
- converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
- } catch (ConfigurationException e) {
- logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
- IdentityStreamPartitionConverter.class.getName());
- converter = new IdentityStreamPartitionConverter();
- }
- Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
- LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
- logger.info("Load appraiser class is " + loadAppraiserClassStr.or("not specified.") + " Instantiated "
- + loadAppraiser.getClass().getCanonicalName());
-
- StreamConfigProvider streamConfProvider =
- getStreamConfigProvider(dlConf, converter);
-
- // pre-run
- preRun(dlConf, serverConf);
-
- Pair<DistributedLogServiceImpl, Server> serverPair = runServer(
- serverConf,
- dlConf,
- dynDlConf,
- dlUri,
- converter,
- routingService,
- statsProvider,
- port.or(0),
- keepAliveLatch,
- statsReceiver,
- thriftmux.isPresent(),
- streamConfProvider,
- loadAppraiser);
-
- this.dlService = serverPair.getLeft();
- this.server = serverPair.getRight();
-
- // announce the service
- announcer.announce();
- // start the routing service after announced
- routingService.startService();
- logger.info("Started the routing service.");
- dlService.startPlacementPolicy();
- logger.info("Started the placement policy.");
- }
-
- protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
- this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs();
- if (!serverConf.isDurableWriteEnabled()) {
- conf.setDurableWriteEnabled(false);
- }
- }
-
- private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
- throws ConfigurationException {
- Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
- if (conf.isPresent()) {
- DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
- configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
- dynConf = configFactory.getDynamicConfiguration(conf.get());
- }
- if (dynConf.isPresent()) {
- return dynConf.get();
- } else {
- return ConfUtils.getConstDynConf(dlConf);
- }
- }
-
- private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf,
- StreamPartitionConverter partitionConverter)
- throws ConfigurationException {
- StreamConfigProvider streamConfProvider = new NullStreamConfigProvider();
- if (streamConf.isPresent() && conf.isPresent()) {
- String dynConfigPath = streamConf.get();
- String defaultConfigFile = conf.get();
- streamConfProvider = new ServiceStreamConfigProvider(
- dynConfigPath,
- defaultConfigFile,
- partitionConverter,
- configExecutorService,
- dlConf.getDynamicConfigReloadIntervalSec(),
- TimeUnit.SECONDS);
- } else if (conf.isPresent()) {
- String configFile = conf.get();
- streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService,
- dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
- }
- return streamConfProvider;
- }
-
- static Pair<DistributedLogServiceImpl, Server> runServer(
- ServerConfiguration serverConf,
- DistributedLogConfiguration dlConf,
- URI dlUri,
- StreamPartitionConverter converter,
- RoutingService routingService,
- StatsProvider provider,
- int port,
- boolean thriftmux,
- LoadAppraiser loadAppraiser) throws IOException {
-
- return runServer(serverConf,
- dlConf,
- ConfUtils.getConstDynConf(dlConf),
- dlUri,
- converter,
- routingService,
- provider,
- port,
- new CountDownLatch(0),
- new NullStatsReceiver(),
- thriftmux,
- new NullStreamConfigProvider(),
- loadAppraiser);
- }
-
- static Pair<DistributedLogServiceImpl, Server> runServer(
- ServerConfiguration serverConf,
- DistributedLogConfiguration dlConf,
- DynamicDistributedLogConfiguration dynDlConf,
- URI dlUri,
- StreamPartitionConverter partitionConverter,
- RoutingService routingService,
- StatsProvider provider,
- int port,
- CountDownLatch keepAliveLatch,
- StatsReceiver statsReceiver,
- boolean thriftmux,
- StreamConfigProvider streamConfProvider,
- LoadAppraiser loadAppraiser) throws IOException {
- logger.info("Running server @ uri {}.", dlUri);
-
- boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
- StatsLogger perStreamStatsLogger;
- if (perStreamStatsEnabled) {
- perStreamStatsLogger = provider.getStatsLogger("stream");
- } else {
- perStreamStatsLogger = NullStatsLogger.INSTANCE;
- }
-
- // dl service
- DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
- serverConf,
- dlConf,
- dynDlConf,
- streamConfProvider,
- dlUri,
- partitionConverter,
- routingService,
- provider.getStatsLogger(""),
- perStreamStatsLogger,
- keepAliveLatch,
- loadAppraiser);
-
- StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
- StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
-
- ServerBuilder serverBuilder = ServerBuilder.get()
- .name("DistributedLogServer")
- .codec(ThriftServerFramedCodec.get())
- .reportTo(statsReceiver)
- .keepAlive(true)
- .bindTo(new InetSocketAddress(port));
-
- if (thriftmux) {
- logger.info("Using thriftmux.");
- Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
- Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
- serverBuilder = serverBuilder.stack(
- ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
- }
-
- logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
-
- // starts dl server
- Server server = ServerBuilder.safeBuild(
- new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen(
- new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen(
- new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))),
- serverBuilder);
-
- logger.info("Started DistributedLog Server.");
- return Pair.of(dlService, server);
- }
-
- static void closeServer(Pair<DistributedLogServiceImpl, Server> pair,
- long gracefulShutdownPeriod,
- TimeUnit timeUnit) {
- if (null != pair.getLeft()) {
- pair.getLeft().shutdown();
- if (gracefulShutdownPeriod > 0) {
- try {
- timeUnit.sleep(gracefulShutdownPeriod);
- } catch (InterruptedException e) {
- logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e);
- }
- }
- }
- if (null != pair.getRight()) {
- logger.info("Closing dl thrift server.");
- pair.getRight().close();
- logger.info("Closed dl thrift server.");
- }
- }
-
- /**
- * Close the server.
- */
- public void close() {
- if (null != announcer) {
- try {
- announcer.unannounce();
- } catch (IOException e) {
- logger.warn("Error on unannouncing service : ", e);
- }
- announcer.close();
- }
- closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS);
- routingService.stopService();
- if (null != statsProvider) {
- statsProvider.stop();
- }
- SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS);
- keepAliveLatch.countDown();
- }
-
- public void join() throws InterruptedException {
- keepAliveLatch.await();
- }
-
- /**
- * Running distributedlog server.
- *
- * @param uri distributedlog namespace
- * @param conf distributedlog configuration file location
- * @param streamConf per stream configuration dir location
- * @param port listen port
- * @param statsPort stats port
- * @param shardId shard id
- * @param announceServerSet whether to announce itself to server set
- * @param thriftmux flag to enable thrift mux
- * @param statsReceiver receiver to receive finagle stats
- * @param statsProvider provider to receive dl stats
- * @return distributedlog server
- * @throws ConfigurationException
- * @throws IllegalArgumentException
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public static DistributedLogServer runServer(
- Optional<String> uri,
- Optional<String> conf,
- Optional<String> streamConf,
- Optional<Integer> port,
- Optional<Integer> statsPort,
- Optional<Integer> shardId,
- Optional<Boolean> announceServerSet,
- Optional<String> loadAppraiserClass,
- Optional<Boolean> thriftmux,
- RoutingService routingService,
- StatsReceiver statsReceiver,
- StatsProvider statsProvider)
- throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
-
- final DistributedLogServer server = new DistributedLogServer(
- uri,
- conf,
- streamConf,
- port,
- statsPort,
- shardId,
- announceServerSet,
- loadAppraiserClass,
- thriftmux,
- routingService,
- statsReceiver,
- statsProvider);
-
- server.runServer();
- return server;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
deleted file mode 100644
index a1642f9..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.client.routing.RoutingUtils;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The launcher of the distributedlog proxy server.
- */
-public class DistributedLogServerApp {
-
- private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
-
- private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
- private final String[] args;
- private final Options options = new Options();
-
- private DistributedLogServerApp(String[] args) {
- this.args = args;
-
- // prepare options
- options.addOption("u", "uri", true, "DistributedLog URI");
- options.addOption("c", "conf", true, "DistributedLog Configuration File");
- options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory");
- options.addOption("p", "port", true, "DistributedLog Server Port");
- options.addOption("sp", "stats-port", true, "DistributedLog Stats Port");
- options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
- options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
- options.addOption("a", "announce", false, "ServerSet Path to Announce");
- options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
- options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
- }
-
- private void printUsage() {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp(USAGE, options);
- }
-
- private void run() {
- try {
- logger.info("Running distributedlog server : args = {}", Arrays.toString(args));
- BasicParser parser = new BasicParser();
- CommandLine cmdline = parser.parse(options, args);
- runCmd(cmdline);
- } catch (ParseException pe) {
- logger.error("Argument error : {}", pe.getMessage());
- printUsage();
- Runtime.getRuntime().exit(-1);
- } catch (IllegalArgumentException iae) {
- logger.error("Argument error : {}", iae.getMessage());
- printUsage();
- Runtime.getRuntime().exit(-1);
- } catch (ConfigurationException ce) {
- logger.error("Configuration error : {}", ce.getMessage());
- printUsage();
- Runtime.getRuntime().exit(-1);
- } catch (IOException ie) {
- logger.error("Failed to start distributedlog server : ", ie);
- Runtime.getRuntime().exit(-1);
- } catch (ClassNotFoundException cnf) {
- logger.error("Failed to start distributedlog server : ", cnf);
- Runtime.getRuntime().exit(-1);
- }
- }
-
- private void runCmd(CommandLine cmdline)
- throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
- final StatsReceiver statsReceiver = NullStatsReceiver.get();
- Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
- DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
- if (confOptional.isPresent()) {
- String configFile = confOptional.get();
- try {
- dlConf.loadConf(new File(configFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from "
- + configFile + ".");
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
- + configFile + ".");
- }
- }
- // load the stats provider
- final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd")
- .transform(new Function<String, StatsProvider>() {
- @Nullable
- @Override
- public StatsProvider apply(@Nullable String name) {
- return ReflectionUtils.newInstance(name, StatsProvider.class);
- }
- }).or(new NullStatsProvider());
-
- final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
- checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
- URI dlUri = URI.create(uriOption.get());
-
- DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
- RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet())
- .statsReceiver(statsReceiver.scope("routing"))
- .build();
-
- final DistributedLogServer server = DistributedLogServer.runServer(
- uriOption,
- confOptional,
- getOptionalStringArg(cmdline, "sc"),
- getOptionalIntegerArg(cmdline, "p"),
- getOptionalIntegerArg(cmdline, "sp"),
- getOptionalIntegerArg(cmdline, "si"),
- getOptionalBooleanArg(cmdline, "a"),
- getOptionalStringArg(cmdline, "la"),
- getOptionalBooleanArg(cmdline, "mx"),
- routingService,
- statsReceiver,
- statsProvider);
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- logger.info("Closing DistributedLog Server.");
- server.close();
- logger.info("Closed DistributedLog Server.");
- statsProvider.stop();
- }
- });
-
- try {
- server.join();
- } catch (InterruptedException e) {
- logger.warn("Interrupted when waiting distributedlog server to be finished : ", e);
- }
-
- logger.info("DistributedLog Service Interrupted.");
- server.close();
- logger.info("Closed DistributedLog Server.");
- statsProvider.stop();
- }
-
- public static void main(String[] args) {
- final DistributedLogServerApp launcher = new DistributedLogServerApp(args);
- launcher.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
deleted file mode 100644
index c37cd53..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
+++ /dev/null
@@ -1,794 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.net.InetSocketAddressHelper;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RegionUnavailableException;
-import org.apache.distributedlog.exceptions.ServiceUnavailableException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.exceptions.TooManyStreamsException;
-import org.apache.distributedlog.feature.AbstractFeatureProvider;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.rate.MovingAverageRate;
-import org.apache.distributedlog.rate.MovingAverageRateFactory;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy;
-import org.apache.distributedlog.service.placement.LoadAppraiser;
-import org.apache.distributedlog.service.placement.PlacementPolicy;
-import org.apache.distributedlog.service.placement.ZKPlacementStateManager;
-import org.apache.distributedlog.service.stream.BulkWriteOp;
-import org.apache.distributedlog.service.stream.DeleteOp;
-import org.apache.distributedlog.service.stream.HeartbeatOp;
-import org.apache.distributedlog.service.stream.ReleaseOp;
-import org.apache.distributedlog.service.stream.Stream;
-import org.apache.distributedlog.service.stream.StreamFactory;
-import org.apache.distributedlog.service.stream.StreamFactoryImpl;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import org.apache.distributedlog.service.stream.StreamOp;
-import org.apache.distributedlog.service.stream.StreamOpStats;
-import org.apache.distributedlog.service.stream.TruncateOp;
-import org.apache.distributedlog.service.stream.WriteOp;
-import org.apache.distributedlog.service.stream.WriteOpWithPayload;
-import org.apache.distributedlog.service.stream.admin.CreateOp;
-import org.apache.distributedlog.service.stream.admin.StreamAdminOp;
-import org.apache.distributedlog.service.stream.limiter.ServiceRequestLimiter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.service.utils.ServerUtils;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ClientInfo;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import org.apache.distributedlog.thrift.service.ServerStatus;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Timer;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * Implementation of distributedlog thrift service.
- */
-public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
- FatalErrorHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
-
- private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
-
- private final ServerConfiguration serverConfig;
- private final DistributedLogConfiguration dlConfig;
- private final DistributedLogNamespace dlNamespace;
- private final int serverRegionId;
- private final PlacementPolicy placementPolicy;
- private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
- private final ReentrantReadWriteLock closeLock =
- new ReentrantReadWriteLock();
- private final CountDownLatch keepAliveLatch;
- private final byte dlsnVersion;
- private final String clientId;
- private final OrderedScheduler scheduler;
- private final AccessControlManager accessControlManager;
- private final StreamConfigProvider streamConfigProvider;
- private final StreamManager streamManager;
- private final StreamFactory streamFactory;
- private final RoutingService routingService;
- private final RegionResolver regionResolver;
- private final MovingAverageRateFactory movingAvgFactory;
- private final MovingAverageRate windowedRps;
- private final MovingAverageRate windowedBps;
- private final ServiceRequestLimiter limiter;
- private final Timer timer;
- private final HashedWheelTimer requestTimer;
-
- // Features
- private final FeatureProvider featureProvider;
- private final Feature featureRegionStopAcceptNewStream;
- private final Feature featureChecksumDisabled;
- private final Feature limiterDisabledFeature;
-
- // Stats
- private final StatsLogger statsLogger;
- private final StatsLogger perStreamStatsLogger;
- private final StreamPartitionConverter streamPartitionConverter;
- private final StreamOpStats streamOpStats;
- private final Counter bulkWritePendingStat;
- private final Counter writePendingStat;
- private final Counter redirects;
- private final Counter receivedRecordCounter;
- private final StatsLogger statusCodeStatLogger;
- private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters =
- new ConcurrentHashMap<StatusCode, Counter>();
- private final Counter statusCodeTotal;
- private final Gauge<Number> proxyStatusGauge;
- private final Gauge<Number> movingAvgRpsGauge;
- private final Gauge<Number> movingAvgBpsGauge;
- private final Gauge<Number> streamAcquiredGauge;
- private final Gauge<Number> streamCachedGauge;
- private final int shard;
-
- DistributedLogServiceImpl(ServerConfiguration serverConf,
- DistributedLogConfiguration dlConf,
- DynamicDistributedLogConfiguration dynDlConf,
- StreamConfigProvider streamConfigProvider,
- URI uri,
- StreamPartitionConverter converter,
- RoutingService routingService,
- StatsLogger statsLogger,
- StatsLogger perStreamStatsLogger,
- CountDownLatch keepAliveLatch,
- LoadAppraiser loadAppraiser)
- throws IOException {
- // Configuration.
- this.serverConfig = serverConf;
- this.dlConfig = dlConf;
- this.perStreamStatsLogger = perStreamStatsLogger;
- this.dlsnVersion = serverConf.getDlsnVersion();
- this.serverRegionId = serverConf.getRegionId();
- this.streamPartitionConverter = converter;
- int serverPort = serverConf.getServerPort();
- this.shard = serverConf.getServerShardId();
- int numThreads = serverConf.getServerThreads();
- this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
- String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
- serverRegionId,
- shard,
- serverConf.isUseHostnameAsAllocatorPoolName());
- dlConf.setLedgerAllocatorPoolName(allocatorPoolName);
- this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", dlConf, statsLogger.scope("features"));
- if (this.featureProvider instanceof AbstractFeatureProvider) {
- ((AbstractFeatureProvider) featureProvider).start();
- }
-
- // Build the namespace
- this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(dlConf)
- .uri(uri)
- .statsLogger(statsLogger)
- .featureProvider(this.featureProvider)
- .clientId(clientId)
- .regionId(serverRegionId)
- .build();
- this.accessControlManager = this.dlNamespace.createAccessControlManager();
- this.keepAliveLatch = keepAliveLatch;
- this.streamConfigProvider = streamConfigProvider;
-
- // Stats pertaining to stream op execution
- this.streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
-
- // Executor Service.
- this.scheduler = OrderedScheduler.newBuilder()
- .corePoolSize(numThreads)
- .name("DistributedLogService-Executor")
- .traceTaskExecution(true)
- .statsLogger(statsLogger.scope("scheduler"))
- .build();
-
- // Timer, kept separate to ensure reliability of timeouts.
- this.requestTimer = new HashedWheelTimer(
- new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(),
- dlConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
- dlConf.getTimeoutTimerNumTicks());
-
- // Creating and managing Streams
- this.streamFactory = new StreamFactoryImpl(clientId,
- streamOpStats,
- serverConf,
- dlConf,
- featureProvider,
- streamConfigProvider,
- converter,
- dlNamespace,
- scheduler,
- this,
- requestTimer);
- this.streamManager = new StreamManagerImpl(
- clientId,
- dlConf,
- scheduler,
- streamFactory,
- converter,
- streamConfigProvider,
- dlNamespace);
- this.routingService = routingService;
- this.regionResolver = new DefaultRegionResolver();
-
- // Service features
- this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(
- ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase());
- this.featureChecksumDisabled = this.featureProvider.getFeature(
- ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase());
- this.limiterDisabledFeature = this.featureProvider.getFeature(
- ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase());
-
- // Resource limiting
- this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
- this.movingAvgFactory = new MovingAverageRateFactory(timer);
- this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
- this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
- this.limiter = new ServiceRequestLimiter(
- dynDlConf,
- streamOpStats.baseScope("service_limiter"),
- windowedRps,
- windowedBps,
- streamManager,
- limiterDisabledFeature);
-
- this.placementPolicy = new LeastLoadPlacementPolicy(
- loadAppraiser,
- routingService,
- dlNamespace,
- new ZKPlacementStateManager(uri, dlConf, statsLogger),
- Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
- statsLogger);
- logger.info("placement started");
-
- // Stats
- this.statsLogger = statsLogger;
-
- // Gauges for server status/health
- this.proxyStatusGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
- ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
- }
- };
- this.movingAvgRpsGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return windowedRps.get();
- }
- };
- this.movingAvgBpsGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return windowedBps.get();
- }
- };
- // Gauges for streams
- this.streamAcquiredGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return streamManager.numAcquired();
- }
- };
- this.streamCachedGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return streamManager.numCached();
- }
- };
-
- // Stats on server
- statsLogger.registerGauge("proxy_status", proxyStatusGauge);
- // Global moving average rps
- statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge);
- // Global moving average bps
- statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge);
- // Stats on requests
- this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending");
- this.writePendingStat = streamOpStats.requestPendingCounter("writePending");
- this.redirects = streamOpStats.requestCounter("redirect");
- this.statusCodeStatLogger = streamOpStats.requestScope("statuscode");
- this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count");
- this.receivedRecordCounter = streamOpStats.recordsCounter("received");
-
- // Stats for streams
- StatsLogger streamsStatsLogger = statsLogger.scope("streams");
- streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge);
- streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
-
- // Setup complete
- logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
- + " dlsn version {}.",
- new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
- }
-
- private void countStatusCode(StatusCode code) {
- Counter counter = statusCodeCounters.get(code);
- if (null == counter) {
- counter = statusCodeStatLogger.getCounter(code.name());
- Counter oldCounter = statusCodeCounters.putIfAbsent(code, counter);
- if (null != oldCounter) {
- counter = oldCounter;
- }
- }
- counter.inc();
- statusCodeTotal.inc();
- }
-
- @Override
- public Future<ServerInfo> handshake() {
- return handshakeWithClientInfo(new ClientInfo());
- }
-
- @Override
- public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
- ServerInfo serverInfo = new ServerInfo();
- closeLock.readLock().lock();
- try {
- serverInfo.setServerStatus(serverStatus);
- } finally {
- closeLock.readLock().unlock();
- }
-
- if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) {
- return Future.value(serverInfo);
- }
-
- Optional<String> regex = Optional.absent();
- if (clientInfo.isSetStreamNameRegex()) {
- regex = Optional.of(clientInfo.getStreamNameRegex());
- }
-
- Map<String, String> ownershipMap = streamManager.getStreamOwnershipMap(regex);
- serverInfo.setOwnerships(ownershipMap);
- return Future.value(serverInfo);
- }
-
- @VisibleForTesting
- Stream getLogWriter(String stream) throws IOException {
- Stream writer = streamManager.getStream(stream);
- if (null == writer) {
- closeLock.readLock().lock();
- try {
- if (featureRegionStopAcceptNewStream.isAvailable()) {
- // accept new stream is disabled in current dc
- throw new RegionUnavailableException("Region is unavailable right now.");
- } else if (!(ServerStatus.WRITE_AND_ACCEPT == serverStatus)) {
- // if it is closed, we would not acquire stream again.
- return null;
- }
- writer = streamManager.getOrCreateStream(stream, true);
- } finally {
- closeLock.readLock().unlock();
- }
- }
- return writer;
- }
-
- // Service interface methods
-
- @Override
- public Future<WriteResponse> write(final String stream, ByteBuffer data) {
- receivedRecordCounter.inc();
- return doWrite(stream, data, null /* checksum */, false);
- }
-
- @Override
- public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
- List<ByteBuffer> data,
- WriteContext ctx) {
- bulkWritePendingStat.inc();
- receivedRecordCounter.add(data.size());
- BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
- getChecksum(ctx), featureChecksumDisabled, accessControlManager);
- executeStreamOp(op);
- return op.result().ensure(new Function0<BoxedUnit>() {
- public BoxedUnit apply() {
- bulkWritePendingStat.dec();
- return null;
- }
- });
- }
-
- @Override
- public Future<WriteResponse> writeWithContext(final String stream, ByteBuffer data, WriteContext ctx) {
- return doWrite(stream, data, getChecksum(ctx), ctx.isIsRecordSet());
- }
-
- @Override
- public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
- HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
- executeStreamOp(op);
- return op.result();
- }
-
- @Override
- public Future<WriteResponse> heartbeatWithOptions(String stream, WriteContext ctx, HeartbeatOptions options) {
- HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
- if (options.isSendHeartBeatToReader()) {
- op.setWriteControlRecord(true);
- }
- executeStreamOp(op);
- return op.result();
- }
-
- @Override
- public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
- TruncateOp op = new TruncateOp(
- stream,
- DLSN.deserialize(dlsn),
- statsLogger,
- perStreamStatsLogger,
- getChecksum(ctx),
- featureChecksumDisabled,
- accessControlManager);
- executeStreamOp(op);
- return op.result();
- }
-
- @Override
- public Future<WriteResponse> delete(String stream, WriteContext ctx) {
- DeleteOp op = new DeleteOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
- executeStreamOp(op);
- return op.result();
- }
-
- @Override
- public Future<WriteResponse> release(String stream, WriteContext ctx) {
- ReleaseOp op = new ReleaseOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
- executeStreamOp(op);
- return op.result();
- }
-
- @Override
- public Future<WriteResponse> create(String stream, WriteContext ctx) {
- CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled);
- return executeStreamAdminOp(op);
- }
-
- //
- // Ownership RPC
- //
-
- @Override
- public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) {
- if (streamManager.isAcquired(streamName)) {
- // the stream is already acquired
- return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
- }
-
- return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
- @Override
- public WriteResponse apply(String server) {
- String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
- return new WriteResponse(ResponseUtils.ownerToHeader(host));
- }
- });
- }
-
-
- //
- // Admin RPCs
- //
-
- @Override
- public Future<Void> setAcceptNewStream(boolean enabled) {
- closeLock.writeLock().lock();
- try {
- logger.info("Set AcceptNewStream = {}", enabled);
- if (ServerStatus.DOWN != serverStatus) {
- if (enabled) {
- serverStatus = ServerStatus.WRITE_AND_ACCEPT;
- } else {
- serverStatus = ServerStatus.WRITE_ONLY;
- }
- }
- } finally {
- closeLock.writeLock().unlock();
- }
- return Future.Void();
- }
-
- private Future<WriteResponse> doWrite(final String name,
- ByteBuffer data,
- Long checksum,
- boolean isRecordSet) {
- writePendingStat.inc();
- receivedRecordCounter.inc();
- WriteOp op = newWriteOp(name, data, checksum, isRecordSet);
- executeStreamOp(op);
- return op.result().ensure(new Function0<BoxedUnit>() {
- public BoxedUnit apply() {
- writePendingStat.dec();
- return null;
- }
- });
- }
-
- private Long getChecksum(WriteContext ctx) {
- return ctx.isSetCrc32() ? ctx.getCrc32() : null;
- }
-
- private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) {
- try {
- op.preExecute();
- } catch (DLException dle) {
- return Future.exception(dle);
- }
- return op.execute();
- }
-
- private void executeStreamOp(final StreamOp op) {
-
- // Must attach this as early as possible--returning before this point will cause us to
- // lose the status code.
- op.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() {
- @Override
- public void onSuccess(ResponseHeader header) {
- if (header.getLocation() != null || header.getCode() == StatusCode.FOUND) {
- redirects.inc();
- }
- countStatusCode(header.getCode());
- }
- @Override
- public void onFailure(Throwable cause) {
- }
- });
-
- try {
- // Apply the request limiter
- limiter.apply(op);
-
- // Execute per-op pre-exec code
- op.preExecute();
-
- } catch (TooManyStreamsException e) {
- // Translate to StreamUnavailableException to ensure that the client will redirect
- // to a different host. Ideally we would be able to return TooManyStreamsException,
- // but the way exception handling works right now we can't control the handling in
- // the client because client changes deploy very slowly.
- op.fail(new StreamUnavailableException(e.getMessage()));
- return;
- } catch (Exception e) {
- op.fail(e);
- return;
- }
-
- Stream stream;
- try {
- stream = getLogWriter(op.streamName());
- } catch (RegionUnavailableException rue) {
- // redirect the requests to other region
- op.fail(new RegionUnavailableException("Region " + serverRegionId + " is unavailable."));
- return;
- } catch (IOException e) {
- op.fail(e);
- return;
- }
- if (null == stream) {
- // redirect the requests when stream is unavailable.
- op.fail(new ServiceUnavailableException("Server " + clientId + " is closed."));
- return;
- }
-
- if (op instanceof WriteOpWithPayload) {
- WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
- windowedBps.add(writeOp.getPayloadSize());
- windowedRps.inc();
- }
-
- stream.submit(op);
- }
-
- void shutdown() {
- try {
- closeLock.writeLock().lock();
- try {
- if (ServerStatus.DOWN == serverStatus) {
- return;
- }
- serverStatus = ServerStatus.DOWN;
- } finally {
- closeLock.writeLock().unlock();
- }
-
- streamManager.close();
- movingAvgFactory.close();
- limiter.close();
-
- Stopwatch closeStreamsStopwatch = Stopwatch.createStarted();
-
- Future<List<Void>> closeResult = streamManager.closeStreams();
- logger.info("Waiting for closing all streams ...");
- try {
- Await.result(closeResult, Duration.fromTimeUnit(5, TimeUnit.MINUTES));
- logger.info("Closed all streams in {} millis.",
- closeStreamsStopwatch.elapsed(TimeUnit.MILLISECONDS));
- } catch (InterruptedException e) {
- logger.warn("Interrupted on waiting for closing all streams : ", e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e);
- }
-
- // shutdown the dl namespace
- logger.info("Closing distributedlog namespace ...");
- dlNamespace.close();
- logger.info("Closed distributedlog namespace .");
-
- // Stop the feature provider
- if (this.featureProvider instanceof AbstractFeatureProvider) {
- ((AbstractFeatureProvider) featureProvider).stop();
- }
-
- // Stop the timer.
- timer.stop();
- placementPolicy.close();
-
- // clean up gauge
- unregisterGauge();
-
- // shutdown the executor after requesting closing streams.
- SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS);
- } catch (Exception ex) {
- logger.info("Exception while shutting down distributedlog service.");
- } finally {
- // release the keepAliveLatch in case shutdown is called from a shutdown hook.
- keepAliveLatch.countDown();
- logger.info("Finished shutting down distributedlog service.");
- }
- }
-
- protected void startPlacementPolicy() {
- this.placementPolicy.start(shard == 0);
- }
-
- @Override
- public void notifyFatalError() {
- triggerShutdown();
- }
-
- private void triggerShutdown() {
- // release the keepAliveLatch to let the main thread shutdown the whole service.
- logger.info("Releasing KeepAlive Latch to trigger shutdown ...");
- keepAliveLatch.countDown();
- logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
- }
-
- // Test methods.
-
- private DynamicDistributedLogConfiguration getDynConf(String streamName) {
- Optional<DynamicDistributedLogConfiguration> dynDlConf =
- streamConfigProvider.getDynamicStreamConfig(streamName);
- if (dynDlConf.isPresent()) {
- return dynDlConf.get();
- } else {
- return ConfUtils.getConstDynConf(dlConfig);
- }
- }
-
- /**
- * clean up the gauge before we close to help GC.
- */
- private void unregisterGauge(){
- this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
- this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
- this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
- this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
- this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
- }
-
- @VisibleForTesting
- Stream newStream(String name) throws IOException {
- return streamManager.getOrCreateStream(name, false);
- }
-
- @VisibleForTesting
- WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum) {
- return newWriteOp(stream, data, checksum, false);
- }
-
- @VisibleForTesting
- RoutingService getRoutingService() {
- return this.routingService;
- }
-
- @VisibleForTesting
- DLSocketAddress getServiceAddress() throws IOException {
- return DLSocketAddress.deserialize(clientId);
- }
-
- WriteOp newWriteOp(String stream,
- ByteBuffer data,
- Long checksum,
- boolean isRecordSet) {
- return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
- serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
- accessControlManager);
- }
-
- @VisibleForTesting
- Future<List<Void>> closeStreams() {
- return streamManager.closeStreams();
- }
-
- @VisibleForTesting
- public DistributedLogNamespace getDistributedLogNamespace() {
- return dlNamespace;
- }
-
- @VisibleForTesting
- StreamManager getStreamManager() {
- return streamManager;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
deleted file mode 100644
index 17b5ab3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-/**
- * Implement handling for an unrecoverable error.
- */
-public interface FatalErrorHandler {
-
- /**
- * This method is invoked when an unrecoverable error has occurred
- * and no progress can be made. It should implement a shutdown routine.
- */
- void notifyFatalError();
-}