You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:25 UTC

[18/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java
new file mode 100644
index 0000000..96bc338
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * DistributedLog Client Related Utils.
+ */
+public class ClientUtils {
+
+    public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {
+        DistributedLogClientImpl clientImpl = builder.buildClient();
+        return Pair.of((DistributedLogClient) clientImpl, (MonitorServiceClient) clientImpl);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
new file mode 100644
index 0000000..9cc085d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LocalDLMEmulator;
+import org.apache.distributedlog.client.routing.SingleHostRoutingService;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
+import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import com.twitter.finagle.builder.Server;
+import java.io.File;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DistributedLog Cluster is an emulator to run distributedlog components.
+ */
+public class DistributedLogCluster {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build distributedlog cluster.
+     */
+    public static class Builder {
+
+        int numBookies = 3;
+        boolean shouldStartZK = true;
+        String zkHost = "127.0.0.1";
+        int zkPort = 0;
+        boolean shouldStartProxy = true;
+        int proxyPort = 7000;
+        boolean thriftmux = false;
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
+                .setLockTimeout(10)
+                .setOutputBufferSize(0)
+                .setImmediateFlushEnabled(true);
+        ServerConfiguration bkConf = new ServerConfiguration();
+
+        private Builder() {}
+
+        /**
+         * How many bookies to run. By default is 3.
+         *
+         * @return builder
+         */
+        public Builder numBookies(int numBookies) {
+            this.numBookies = numBookies;
+            return this;
+        }
+
+        /**
+         * Whether to start zookeeper? By default is true.
+         *
+         * @param startZK
+         *          flag to start zookeeper?
+         * @return builder
+         */
+        public Builder shouldStartZK(boolean startZK) {
+            this.shouldStartZK = startZK;
+            return this;
+        }
+
+        /**
+         * ZooKeeper server to run. By default it runs locally on '127.0.0.1'.
+         *
+         * @param zkServers
+         *          zk servers
+         * @return builder
+         */
+        public Builder zkServers(String zkServers) {
+            this.zkHost = zkServers;
+            return this;
+        }
+
+        /**
+         * ZooKeeper server port to listen on. By default it listens on 2181.
+         *
+         * @param zkPort
+         *          zookeeper server port.
+         * @return builder.
+         */
+        public Builder zkPort(int zkPort) {
+            this.zkPort = zkPort;
+            return this;
+        }
+
+        /**
+         * Whether to start proxy or not. By default is true.
+         *
+         * @param startProxy
+         *          whether to start proxy or not.
+         * @return builder
+         */
+        public Builder shouldStartProxy(boolean startProxy) {
+            this.shouldStartProxy = startProxy;
+            return this;
+        }
+
+        /**
+         * Port that proxy server to listen on. By default is 7000.
+         *
+         * @param proxyPort
+         *          port that proxy server to listen on.
+         * @return builder
+         */
+        public Builder proxyPort(int proxyPort) {
+            this.proxyPort = proxyPort;
+            return this;
+        }
+
+        /**
+         * Set the distributedlog configuration.
+         *
+         * @param dlConf
+         *          distributedlog configuration
+         * @return builder
+         */
+        public Builder dlConf(DistributedLogConfiguration dlConf) {
+            this.dlConf = dlConf;
+            return this;
+        }
+
+        /**
+         * Set the Bookkeeper server configuration.
+         *
+         * @param bkConf
+         *          bookkeeper server configuration
+         * @return builder
+         */
+        public Builder bkConf(ServerConfiguration bkConf) {
+            this.bkConf = bkConf;
+            return this;
+        }
+
+        /**
+         * Enable thriftmux for the dl server.
+         *
+         * @param enabled flag to enable thriftmux
+         * @return builder
+         */
+        public Builder thriftmux(boolean enabled) {
+            this.thriftmux = enabled;
+            return this;
+        }
+
+        public DistributedLogCluster build() throws Exception {
+            // build the cluster
+            return new DistributedLogCluster(
+                dlConf,
+                bkConf,
+                numBookies,
+                shouldStartZK,
+                zkHost,
+                zkPort,
+                shouldStartProxy,
+                proxyPort,
+                thriftmux);
+        }
+    }
+
+    /**
+     * Run a distributedlog proxy server.
+     */
+    public static class DLServer {
+
+        static final int MAX_RETRIES = 20;
+        static final int MIN_PORT = 1025;
+        static final int MAX_PORT = 65535;
+
+        int proxyPort;
+
+        public final InetSocketAddress address;
+        public final Pair<DistributedLogServiceImpl, Server> dlServer;
+        private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);
+
+        protected DLServer(DistributedLogConfiguration dlConf,
+                           URI uri,
+                           int basePort,
+                           boolean thriftmux) throws Exception {
+            proxyPort = basePort;
+
+            boolean success = false;
+            int retries = 0;
+            Pair<DistributedLogServiceImpl, Server> serverPair = null;
+            while (!success) {
+                try {
+                    org.apache.distributedlog.service.config.ServerConfiguration serverConf =
+                            new org.apache.distributedlog.service.config.ServerConfiguration();
+                    serverConf.loadConf(dlConf);
+                    serverConf.setServerShardId(proxyPort);
+                    serverPair = DistributedLogServer.runServer(
+                            serverConf,
+                            dlConf,
+                            uri,
+                            new IdentityStreamPartitionConverter(),
+                            routingService,
+                            new NullStatsProvider(),
+                            proxyPort,
+                            thriftmux,
+                            new EqualLoadAppraiser());
+                    routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
+                    routingService.startService();
+                    serverPair.getLeft().startPlacementPolicy();
+                    success = true;
+                } catch (BindException be) {
+                    retries++;
+                    if (retries > MAX_RETRIES) {
+                        throw be;
+                    }
+                    proxyPort++;
+                    if (proxyPort > MAX_PORT) {
+                        proxyPort = MIN_PORT;
+                    }
+                }
+            }
+
+            LOG.info("Running DL on port {}", proxyPort);
+
+            dlServer = serverPair;
+            address = DLSocketAddress.getSocketAddress(proxyPort);
+        }
+
+        public InetSocketAddress getAddress() {
+            return address;
+        }
+
+        public void shutdown() {
+            DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
+            routingService.stopService();
+        }
+    }
+
+    private final DistributedLogConfiguration dlConf;
+    private final ZooKeeperServerShim zks;
+    private final LocalDLMEmulator dlmEmulator;
+    private DLServer dlServer;
+    private final boolean shouldStartProxy;
+    private final int proxyPort;
+    private final boolean thriftmux;
+    private final List<File> tmpDirs = new ArrayList<File>();
+
+    private DistributedLogCluster(DistributedLogConfiguration dlConf,
+                                  ServerConfiguration bkConf,
+                                  int numBookies,
+                                  boolean shouldStartZK,
+                                  String zkServers,
+                                  int zkPort,
+                                  boolean shouldStartProxy,
+                                  int proxyPort,
+                                  boolean thriftmux) throws Exception {
+        this.dlConf = dlConf;
+        if (shouldStartZK) {
+            File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
+            tmpDirs.add(zkTmpDir);
+            if (0 == zkPort) {
+                Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
+                this.zks = serverAndPort.getLeft();
+                zkPort = serverAndPort.getRight();
+            } else {
+                this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
+            }
+        } else {
+            this.zks = null;
+        }
+        this.dlmEmulator = LocalDLMEmulator.newBuilder()
+                .numBookies(numBookies)
+                .zkHost(zkServers)
+                .zkPort(zkPort)
+                .serverConf(bkConf)
+                .shouldStartZK(false)
+                .build();
+        this.shouldStartProxy = shouldStartProxy;
+        this.proxyPort = proxyPort;
+        this.thriftmux = thriftmux;
+    }
+
+    public void start() throws Exception {
+        this.dlmEmulator.start();
+        BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
+        DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
+        if (shouldStartProxy) {
+            this.dlServer = new DLServer(
+                    dlConf,
+                    this.dlmEmulator.getUri(),
+                    proxyPort,
+                    thriftmux);
+        } else {
+            this.dlServer = null;
+        }
+    }
+
+    public void stop() throws Exception {
+        if (null != dlServer) {
+            this.dlServer.shutdown();
+        }
+        this.dlmEmulator.teardown();
+        if (null != this.zks) {
+            this.zks.stop();
+        }
+        for (File dir : tmpDirs) {
+            FileUtils.forceDeleteOnExit(dir);
+        }
+    }
+
+    public URI getUri() {
+        return this.dlmEmulator.getUri();
+    }
+
+    public String getZkServers() {
+        return this.dlmEmulator.getZkServers();
+    }
+
+    public String getProxyFinagleStr() {
+        return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
new file mode 100644
index 0000000..81e476b
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.config.DynamicConfigurationFactory;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.service.announcer.Announcer;
+import org.apache.distributedlog.service.announcer.NOPAnnouncer;
+import org.apache.distributedlog.service.announcer.ServerSetAnnouncer;
+import org.apache.distributedlog.service.config.DefaultStreamConfigProvider;
+import org.apache.distributedlog.service.config.NullStreamConfigProvider;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.ServiceStreamConfigProvider;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
+import org.apache.distributedlog.service.placement.LoadAppraiser;
+import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.Stack;
+import com.twitter.finagle.ThriftMuxServer$;
+import com.twitter.finagle.builder.Server;
+import com.twitter.finagle.builder.ServerBuilder;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientIdRequiredFilter;
+import com.twitter.finagle.thrift.ThriftServerFramedCodec;
+import com.twitter.finagle.transport.Transport;
+import com.twitter.util.Duration;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+
+/**
+ * Running the distributedlog proxy server.
+ */
+public class DistributedLogServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+    private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
+
+    private DistributedLogServiceImpl dlService = null;
+    private Server server = null;
+    private RoutingService routingService;
+    private StatsProvider statsProvider;
+    private Announcer announcer = null;
+    private ScheduledExecutorService configExecutorService;
+    private long gracefulShutdownMs = 0L;
+
+    private final StatsReceiver statsReceiver;
+    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
+    private final Optional<String> uri;
+    private final Optional<String> conf;
+    private final Optional<String> streamConf;
+    private final Optional<Integer> port;
+    private final Optional<Integer> statsPort;
+    private final Optional<Integer> shardId;
+    private final Optional<Boolean> announceServerSet;
+    private final Optional<String> loadAppraiserClassStr;
+    private final Optional<Boolean> thriftmux;
+
+    DistributedLogServer(Optional<String> uri,
+                         Optional<String> conf,
+                         Optional<String> streamConf,
+                         Optional<Integer> port,
+                         Optional<Integer> statsPort,
+                         Optional<Integer> shardId,
+                         Optional<Boolean> announceServerSet,
+                         Optional<String> loadAppraiserClass,
+                         Optional<Boolean> thriftmux,
+                         RoutingService routingService,
+                         StatsReceiver statsReceiver,
+                         StatsProvider statsProvider) {
+        this.uri = uri;
+        this.conf = conf;
+        this.streamConf = streamConf;
+        this.port = port;
+        this.statsPort = statsPort;
+        this.shardId = shardId;
+        this.announceServerSet = announceServerSet;
+        this.thriftmux = thriftmux;
+        this.routingService = routingService;
+        this.statsReceiver = statsReceiver;
+        this.statsProvider = statsProvider;
+        this.loadAppraiserClassStr = loadAppraiserClass;
+    }
+
+    public void runServer()
+        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+        if (!uri.isPresent()) {
+            throw new IllegalArgumentException("No distributedlog uri provided.");
+        }
+        URI dlUri = URI.create(uri.get());
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+        if (conf.isPresent()) {
+            String configFile = conf.get();
+            try {
+                dlConf.loadConf(new File(configFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+                    + configFile + ".");
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+                        + configFile + ".");
+            }
+        }
+
+        this.configExecutorService = Executors.newScheduledThreadPool(1,
+                new ThreadFactoryBuilder()
+                        .setNameFormat("DistributedLogService-Dyncfg-%d")
+                        .setDaemon(true)
+                        .build());
+
+        // server configuration and dynamic configuration
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.loadConf(dlConf);
+
+        // overwrite the shard id if it is provided in the args
+        if (shardId.isPresent()) {
+            serverConf.setServerShardId(shardId.get());
+        }
+
+        serverConf.validate();
+
+        DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf);
+
+        logger.info("Starting stats provider : {}", statsProvider.getClass());
+        statsProvider.start(dlConf);
+
+        if (announceServerSet.isPresent() && announceServerSet.get()) {
+            announcer = new ServerSetAnnouncer(
+                    dlUri,
+                    port.or(0),
+                    statsPort.or(0),
+                    shardId.or(0));
+        } else {
+            announcer = new NOPAnnouncer();
+        }
+
+        // Build the stream partition converter
+        StreamPartitionConverter converter;
+        try {
+            converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
+        } catch (ConfigurationException e) {
+            logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
+                    IdentityStreamPartitionConverter.class.getName());
+            converter = new IdentityStreamPartitionConverter();
+        }
+        Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
+        LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
+        logger.info("Load appraiser class is " + loadAppraiserClassStr.or("not specified.") + " Instantiated "
+                + loadAppraiser.getClass().getCanonicalName());
+
+        StreamConfigProvider streamConfProvider =
+                getStreamConfigProvider(dlConf, converter);
+
+        // pre-run
+        preRun(dlConf, serverConf);
+
+        Pair<DistributedLogServiceImpl, Server> serverPair = runServer(
+                serverConf,
+                dlConf,
+                dynDlConf,
+                dlUri,
+                converter,
+                routingService,
+                statsProvider,
+                port.or(0),
+                keepAliveLatch,
+                statsReceiver,
+                thriftmux.isPresent(),
+                streamConfProvider,
+                loadAppraiser);
+
+        this.dlService = serverPair.getLeft();
+        this.server = serverPair.getRight();
+
+        // announce the service
+        announcer.announce();
+        // start the routing service after announced
+        routingService.startService();
+        logger.info("Started the routing service.");
+        dlService.startPlacementPolicy();
+        logger.info("Started the placement policy.");
+    }
+
+    protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
+        this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs();
+        if (!serverConf.isDurableWriteEnabled()) {
+            conf.setDurableWriteEnabled(false);
+        }
+    }
+
+    private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
+        throws ConfigurationException {
+        Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
+        if (conf.isPresent()) {
+            DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
+                    configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
+            dynConf = configFactory.getDynamicConfiguration(conf.get());
+        }
+        if (dynConf.isPresent()) {
+            return dynConf.get();
+        } else {
+            return ConfUtils.getConstDynConf(dlConf);
+        }
+    }
+
+    private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf,
+                                                         StreamPartitionConverter partitionConverter)
+            throws ConfigurationException {
+        StreamConfigProvider streamConfProvider = new NullStreamConfigProvider();
+        if (streamConf.isPresent() && conf.isPresent()) {
+            String dynConfigPath = streamConf.get();
+            String defaultConfigFile = conf.get();
+            streamConfProvider = new ServiceStreamConfigProvider(
+                    dynConfigPath,
+                    defaultConfigFile,
+                    partitionConverter,
+                    configExecutorService,
+                    dlConf.getDynamicConfigReloadIntervalSec(),
+                    TimeUnit.SECONDS);
+        } else if (conf.isPresent()) {
+            String configFile = conf.get();
+            streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService,
+                    dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
+        }
+        return streamConfProvider;
+    }
+
+    static Pair<DistributedLogServiceImpl, Server> runServer(
+            ServerConfiguration serverConf,
+            DistributedLogConfiguration dlConf,
+            URI dlUri,
+            StreamPartitionConverter converter,
+            RoutingService routingService,
+            StatsProvider provider,
+            int port,
+            boolean thriftmux,
+            LoadAppraiser loadAppraiser) throws IOException {
+
+        return runServer(serverConf,
+                dlConf,
+                ConfUtils.getConstDynConf(dlConf),
+                dlUri,
+                converter,
+                routingService,
+                provider,
+                port,
+                new CountDownLatch(0),
+                new NullStatsReceiver(),
+                thriftmux,
+                new NullStreamConfigProvider(),
+                loadAppraiser);
+    }
+
+    static Pair<DistributedLogServiceImpl, Server> runServer(
+            ServerConfiguration serverConf,
+            DistributedLogConfiguration dlConf,
+            DynamicDistributedLogConfiguration dynDlConf,
+            URI dlUri,
+            StreamPartitionConverter partitionConverter,
+            RoutingService routingService,
+            StatsProvider provider,
+            int port,
+            CountDownLatch keepAliveLatch,
+            StatsReceiver statsReceiver,
+            boolean thriftmux,
+            StreamConfigProvider streamConfProvider,
+            LoadAppraiser loadAppraiser) throws IOException {
+        logger.info("Running server @ uri {}.", dlUri);
+
+        boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
+        StatsLogger perStreamStatsLogger;
+        if (perStreamStatsEnabled) {
+            perStreamStatsLogger = provider.getStatsLogger("stream");
+        } else {
+            perStreamStatsLogger = NullStatsLogger.INSTANCE;
+        }
+
+        // dl service
+        DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
+            serverConf,
+            dlConf,
+            dynDlConf,
+            streamConfProvider,
+            dlUri,
+            partitionConverter,
+            routingService,
+            provider.getStatsLogger(""),
+            perStreamStatsLogger,
+            keepAliveLatch,
+            loadAppraiser);
+
+        StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
+        StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
+
+        ServerBuilder serverBuilder = ServerBuilder.get()
+                .name("DistributedLogServer")
+                .codec(ThriftServerFramedCodec.get())
+                .reportTo(statsReceiver)
+                .keepAlive(true)
+                .bindTo(new InetSocketAddress(port));
+
+        if (thriftmux) {
+            logger.info("Using thriftmux.");
+            Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
+                    Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
+            serverBuilder = serverBuilder.stack(
+                ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
+        }
+
+        logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
+
+        // starts dl server
+        Server server = ServerBuilder.safeBuild(
+                new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen(
+                    new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen(
+                        new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))),
+                serverBuilder);
+
+        logger.info("Started DistributedLog Server.");
+        return Pair.of(dlService, server);
+    }
+
+    static void closeServer(Pair<DistributedLogServiceImpl, Server> pair,
+                            long gracefulShutdownPeriod,
+                            TimeUnit timeUnit) {
+        if (null != pair.getLeft()) {
+            pair.getLeft().shutdown();
+            if (gracefulShutdownPeriod > 0) {
+                try {
+                    timeUnit.sleep(gracefulShutdownPeriod);
+                } catch (InterruptedException e) {
+                    logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e);
+                }
+            }
+        }
+        if (null != pair.getRight()) {
+            logger.info("Closing dl thrift server.");
+            pair.getRight().close();
+            logger.info("Closed dl thrift server.");
+        }
+    }
+
+    /**
+     * Close the server.
+     */
+    public void close() {
+        if (null != announcer) {
+            try {
+                announcer.unannounce();
+            } catch (IOException e) {
+                logger.warn("Error on unannouncing service : ", e);
+            }
+            announcer.close();
+        }
+        closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS);
+        routingService.stopService();
+        if (null != statsProvider) {
+            statsProvider.stop();
+        }
+        SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS);
+        keepAliveLatch.countDown();
+    }
+
+    public void join() throws InterruptedException {
+        keepAliveLatch.await();
+    }
+
+    /**
+     * Running distributedlog server.
+     *
+     * @param uri distributedlog namespace
+     * @param conf distributedlog configuration file location
+     * @param streamConf per stream configuration dir location
+     * @param port listen port
+     * @param statsPort stats port
+     * @param shardId shard id
+     * @param announceServerSet whether to announce itself to server set
+     * @param thriftmux flag to enable thrift mux
+     * @param statsReceiver receiver to receive finagle stats
+     * @param statsProvider provider to receive dl stats
+     * @return distributedlog server
+     * @throws ConfigurationException
+     * @throws IllegalArgumentException
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    public static DistributedLogServer runServer(
+               Optional<String> uri,
+               Optional<String> conf,
+               Optional<String> streamConf,
+               Optional<Integer> port,
+               Optional<Integer> statsPort,
+               Optional<Integer> shardId,
+               Optional<Boolean> announceServerSet,
+               Optional<String> loadAppraiserClass,
+               Optional<Boolean> thriftmux,
+               RoutingService routingService,
+               StatsReceiver statsReceiver,
+               StatsProvider statsProvider)
+        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+
+        final DistributedLogServer server = new DistributedLogServer(
+                uri,
+                conf,
+                streamConf,
+                port,
+                statsPort,
+                shardId,
+                announceServerSet,
+                loadAppraiserClass,
+                thriftmux,
+                routingService,
+                statsReceiver,
+                statsProvider);
+
+        server.runServer();
+        return server;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
new file mode 100644
index 0000000..a1642f9
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.client.routing.RoutingUtils;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The launcher of the distributedlog proxy server.
+ */
+public class DistributedLogServerApp {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
+
+    private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
+    private final String[] args;
+    private final Options options = new Options();
+
+    private DistributedLogServerApp(String[] args) {
+        this.args = args;
+
+        // prepare options
+        options.addOption("u", "uri", true, "DistributedLog URI");
+        options.addOption("c", "conf", true, "DistributedLog Configuration File");
+        options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory");
+        options.addOption("p", "port", true, "DistributedLog Server Port");
+        options.addOption("sp", "stats-port", true, "DistributedLog Stats Port");
+        options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
+        options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
+        options.addOption("a", "announce", false, "ServerSet Path to Announce");
+        options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
+        options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
+    }
+
+    private void printUsage() {
+        HelpFormatter helpFormatter = new HelpFormatter();
+        helpFormatter.printHelp(USAGE, options);
+    }
+
+    private void run() {
+        try {
+            logger.info("Running distributedlog server : args = {}", Arrays.toString(args));
+            BasicParser parser = new BasicParser();
+            CommandLine cmdline = parser.parse(options, args);
+            runCmd(cmdline);
+        } catch (ParseException pe) {
+            logger.error("Argument error : {}", pe.getMessage());
+            printUsage();
+            Runtime.getRuntime().exit(-1);
+        } catch (IllegalArgumentException iae) {
+            logger.error("Argument error : {}", iae.getMessage());
+            printUsage();
+            Runtime.getRuntime().exit(-1);
+        } catch (ConfigurationException ce) {
+            logger.error("Configuration error : {}", ce.getMessage());
+            printUsage();
+            Runtime.getRuntime().exit(-1);
+        } catch (IOException ie) {
+            logger.error("Failed to start distributedlog server : ", ie);
+            Runtime.getRuntime().exit(-1);
+        } catch (ClassNotFoundException cnf) {
+          logger.error("Failed to start distributedlog server : ", cnf);
+          Runtime.getRuntime().exit(-1);
+        }
+    }
+
+    private void runCmd(CommandLine cmdline)
+        throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
+        final StatsReceiver statsReceiver = NullStatsReceiver.get();
+        Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+        if (confOptional.isPresent()) {
+            String configFile = confOptional.get();
+            try {
+                dlConf.loadConf(new File(configFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+                    + configFile + ".");
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+                        + configFile + ".");
+            }
+        }
+        // load the stats provider
+        final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd")
+                .transform(new Function<String, StatsProvider>() {
+                    @Nullable
+                    @Override
+                    public StatsProvider apply(@Nullable String name) {
+                        return ReflectionUtils.newInstance(name, StatsProvider.class);
+                    }
+                }).or(new NullStatsProvider());
+
+        final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
+        checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
+        URI dlUri = URI.create(uriOption.get());
+
+        DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
+        RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet())
+                .statsReceiver(statsReceiver.scope("routing"))
+                .build();
+
+        final DistributedLogServer server = DistributedLogServer.runServer(
+                uriOption,
+                confOptional,
+                getOptionalStringArg(cmdline, "sc"),
+                getOptionalIntegerArg(cmdline, "p"),
+                getOptionalIntegerArg(cmdline, "sp"),
+                getOptionalIntegerArg(cmdline, "si"),
+                getOptionalBooleanArg(cmdline, "a"),
+                getOptionalStringArg(cmdline, "la"),
+                getOptionalBooleanArg(cmdline, "mx"),
+                routingService,
+                statsReceiver,
+                statsProvider);
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                logger.info("Closing DistributedLog Server.");
+                server.close();
+                logger.info("Closed DistributedLog Server.");
+                statsProvider.stop();
+            }
+        });
+
+        try {
+            server.join();
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted when waiting distributedlog server to be finished : ", e);
+        }
+
+        logger.info("DistributedLog Service Interrupted.");
+        server.close();
+        logger.info("Closed DistributedLog Server.");
+        statsProvider.stop();
+    }
+
+    public static void main(String[] args) {
+        final DistributedLogServerApp launcher = new DistributedLogServerApp(args);
+        launcher.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
new file mode 100644
index 0000000..c37cd53
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
@@ -0,0 +1,794 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.net.InetSocketAddressHelper;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RegionUnavailableException;
+import org.apache.distributedlog.exceptions.ServiceUnavailableException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.TooManyStreamsException;
+import org.apache.distributedlog.feature.AbstractFeatureProvider;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.rate.MovingAverageRateFactory;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy;
+import org.apache.distributedlog.service.placement.LoadAppraiser;
+import org.apache.distributedlog.service.placement.PlacementPolicy;
+import org.apache.distributedlog.service.placement.ZKPlacementStateManager;
+import org.apache.distributedlog.service.stream.BulkWriteOp;
+import org.apache.distributedlog.service.stream.DeleteOp;
+import org.apache.distributedlog.service.stream.HeartbeatOp;
+import org.apache.distributedlog.service.stream.ReleaseOp;
+import org.apache.distributedlog.service.stream.Stream;
+import org.apache.distributedlog.service.stream.StreamFactory;
+import org.apache.distributedlog.service.stream.StreamFactoryImpl;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.distributedlog.service.stream.StreamOpStats;
+import org.apache.distributedlog.service.stream.TruncateOp;
+import org.apache.distributedlog.service.stream.WriteOp;
+import org.apache.distributedlog.service.stream.WriteOpWithPayload;
+import org.apache.distributedlog.service.stream.admin.CreateOp;
+import org.apache.distributedlog.service.stream.admin.StreamAdminOp;
+import org.apache.distributedlog.service.stream.limiter.ServiceRequestLimiter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.service.utils.ServerUtils;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.ServerStatus;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Timer;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Implementation of distributedlog thrift service.
+ */
+public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
+                                                  FatalErrorHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
+
+    private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
+
+    private final ServerConfiguration serverConfig;
+    private final DistributedLogConfiguration dlConfig;
+    private final DistributedLogNamespace dlNamespace;
+    private final int serverRegionId;
+    private final PlacementPolicy placementPolicy;
+    private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
+    private final ReentrantReadWriteLock closeLock =
+            new ReentrantReadWriteLock();
+    private final CountDownLatch keepAliveLatch;
+    private final byte dlsnVersion;
+    private final String clientId;
+    private final OrderedScheduler scheduler;
+    private final AccessControlManager accessControlManager;
+    private final StreamConfigProvider streamConfigProvider;
+    private final StreamManager streamManager;
+    private final StreamFactory streamFactory;
+    private final RoutingService routingService;
+    private final RegionResolver regionResolver;
+    private final MovingAverageRateFactory movingAvgFactory;
+    private final MovingAverageRate windowedRps;
+    private final MovingAverageRate windowedBps;
+    private final ServiceRequestLimiter limiter;
+    private final Timer timer;
+    private final HashedWheelTimer requestTimer;
+
+    // Features
+    private final FeatureProvider featureProvider;
+    private final Feature featureRegionStopAcceptNewStream;
+    private final Feature featureChecksumDisabled;
+    private final Feature limiterDisabledFeature;
+
+    // Stats
+    private final StatsLogger statsLogger;
+    private final StatsLogger perStreamStatsLogger;
+    private final StreamPartitionConverter streamPartitionConverter;
+    private final StreamOpStats streamOpStats;
+    private final Counter bulkWritePendingStat;
+    private final Counter writePendingStat;
+    private final Counter redirects;
+    private final Counter receivedRecordCounter;
+    private final StatsLogger statusCodeStatLogger;
+    private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters =
+            new ConcurrentHashMap<StatusCode, Counter>();
+    private final Counter statusCodeTotal;
+    private final Gauge<Number> proxyStatusGauge;
+    private final Gauge<Number> movingAvgRpsGauge;
+    private final Gauge<Number> movingAvgBpsGauge;
+    private final Gauge<Number> streamAcquiredGauge;
+    private final Gauge<Number> streamCachedGauge;
+    private final int shard;
+
+    DistributedLogServiceImpl(ServerConfiguration serverConf,
+                              DistributedLogConfiguration dlConf,
+                              DynamicDistributedLogConfiguration dynDlConf,
+                              StreamConfigProvider streamConfigProvider,
+                              URI uri,
+                              StreamPartitionConverter converter,
+                              RoutingService routingService,
+                              StatsLogger statsLogger,
+                              StatsLogger perStreamStatsLogger,
+                              CountDownLatch keepAliveLatch,
+                              LoadAppraiser loadAppraiser)
+            throws IOException {
+        // Configuration.
+        this.serverConfig = serverConf;
+        this.dlConfig = dlConf;
+        this.perStreamStatsLogger = perStreamStatsLogger;
+        this.dlsnVersion = serverConf.getDlsnVersion();
+        this.serverRegionId = serverConf.getRegionId();
+        this.streamPartitionConverter = converter;
+        int serverPort = serverConf.getServerPort();
+        this.shard = serverConf.getServerShardId();
+        int numThreads = serverConf.getServerThreads();
+        this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
+        String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
+            serverRegionId,
+            shard,
+            serverConf.isUseHostnameAsAllocatorPoolName());
+        dlConf.setLedgerAllocatorPoolName(allocatorPoolName);
+        this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", dlConf, statsLogger.scope("features"));
+        if (this.featureProvider instanceof AbstractFeatureProvider) {
+            ((AbstractFeatureProvider) featureProvider).start();
+        }
+
+        // Build the namespace
+        this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(dlConf)
+                .uri(uri)
+                .statsLogger(statsLogger)
+                .featureProvider(this.featureProvider)
+                .clientId(clientId)
+                .regionId(serverRegionId)
+                .build();
+        this.accessControlManager = this.dlNamespace.createAccessControlManager();
+        this.keepAliveLatch = keepAliveLatch;
+        this.streamConfigProvider = streamConfigProvider;
+
+        // Stats pertaining to stream op execution
+        this.streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+
+        // Executor Service.
+        this.scheduler = OrderedScheduler.newBuilder()
+                .corePoolSize(numThreads)
+                .name("DistributedLogService-Executor")
+                .traceTaskExecution(true)
+                .statsLogger(statsLogger.scope("scheduler"))
+                .build();
+
+        // Timer, kept separate to ensure reliability of timeouts.
+        this.requestTimer = new HashedWheelTimer(
+            new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(),
+            dlConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+            dlConf.getTimeoutTimerNumTicks());
+
+        // Creating and managing Streams
+        this.streamFactory = new StreamFactoryImpl(clientId,
+                streamOpStats,
+                serverConf,
+                dlConf,
+                featureProvider,
+                streamConfigProvider,
+                converter,
+                dlNamespace,
+                scheduler,
+                this,
+                requestTimer);
+        this.streamManager = new StreamManagerImpl(
+                clientId,
+                dlConf,
+                scheduler,
+                streamFactory,
+                converter,
+                streamConfigProvider,
+                dlNamespace);
+        this.routingService = routingService;
+        this.regionResolver = new DefaultRegionResolver();
+
+        // Service features
+        this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(
+                ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase());
+        this.featureChecksumDisabled = this.featureProvider.getFeature(
+                ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase());
+        this.limiterDisabledFeature = this.featureProvider.getFeature(
+                ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase());
+
+        // Resource limiting
+        this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
+        this.movingAvgFactory = new MovingAverageRateFactory(timer);
+        this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
+        this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
+        this.limiter = new ServiceRequestLimiter(
+                dynDlConf,
+                streamOpStats.baseScope("service_limiter"),
+                windowedRps,
+                windowedBps,
+                streamManager,
+                limiterDisabledFeature);
+
+        this.placementPolicy = new LeastLoadPlacementPolicy(
+            loadAppraiser,
+            routingService,
+            dlNamespace,
+            new ZKPlacementStateManager(uri, dlConf, statsLogger),
+            Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
+            statsLogger);
+        logger.info("placement started");
+
+        // Stats
+        this.statsLogger = statsLogger;
+
+        // Gauges for server status/health
+        this.proxyStatusGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
+                    ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
+            }
+        };
+        this.movingAvgRpsGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return windowedRps.get();
+            }
+        };
+        this.movingAvgBpsGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return windowedBps.get();
+            }
+        };
+        // Gauges for streams
+        this.streamAcquiredGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return streamManager.numAcquired();
+            }
+        };
+        this.streamCachedGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return streamManager.numCached();
+            }
+        };
+
+        // Stats on server
+        statsLogger.registerGauge("proxy_status", proxyStatusGauge);
+        // Global moving average rps
+        statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge);
+        // Global moving average bps
+        statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge);
+        // Stats on requests
+        this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending");
+        this.writePendingStat = streamOpStats.requestPendingCounter("writePending");
+        this.redirects = streamOpStats.requestCounter("redirect");
+        this.statusCodeStatLogger = streamOpStats.requestScope("statuscode");
+        this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count");
+        this.receivedRecordCounter = streamOpStats.recordsCounter("received");
+
+        // Stats for streams
+        StatsLogger streamsStatsLogger = statsLogger.scope("streams");
+        streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge);
+        streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
+
+        // Setup complete
+        logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
+            + " dlsn version {}.",
+            new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
+    }
+
+    private void countStatusCode(StatusCode code) {
+        Counter counter = statusCodeCounters.get(code);
+        if (null == counter) {
+            counter = statusCodeStatLogger.getCounter(code.name());
+            Counter oldCounter = statusCodeCounters.putIfAbsent(code, counter);
+            if (null != oldCounter) {
+                counter = oldCounter;
+            }
+        }
+        counter.inc();
+        statusCodeTotal.inc();
+    }
+
+    @Override
+    public Future<ServerInfo> handshake() {
+        return handshakeWithClientInfo(new ClientInfo());
+    }
+
+    @Override
+    public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+        ServerInfo serverInfo = new ServerInfo();
+        closeLock.readLock().lock();
+        try {
+            serverInfo.setServerStatus(serverStatus);
+        } finally {
+            closeLock.readLock().unlock();
+        }
+
+        if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) {
+            return Future.value(serverInfo);
+        }
+
+        Optional<String> regex = Optional.absent();
+        if (clientInfo.isSetStreamNameRegex()) {
+            regex = Optional.of(clientInfo.getStreamNameRegex());
+        }
+
+        Map<String, String> ownershipMap = streamManager.getStreamOwnershipMap(regex);
+        serverInfo.setOwnerships(ownershipMap);
+        return Future.value(serverInfo);
+    }
+
+    @VisibleForTesting
+    Stream getLogWriter(String stream) throws IOException {
+        Stream writer = streamManager.getStream(stream);
+        if (null == writer) {
+            closeLock.readLock().lock();
+            try {
+                if (featureRegionStopAcceptNewStream.isAvailable()) {
+                    // accept new stream is disabled in current dc
+                    throw new RegionUnavailableException("Region is unavailable right now.");
+                } else if (!(ServerStatus.WRITE_AND_ACCEPT == serverStatus)) {
+                    // if it is closed, we would not acquire stream again.
+                    return null;
+                }
+                writer = streamManager.getOrCreateStream(stream, true);
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+        return writer;
+    }
+
+    // Service interface methods
+
+    @Override
+    public Future<WriteResponse> write(final String stream, ByteBuffer data) {
+        receivedRecordCounter.inc();
+        return doWrite(stream, data, null /* checksum */, false);
+    }
+
+    @Override
+    public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
+                                                          List<ByteBuffer> data,
+                                                          WriteContext ctx) {
+        bulkWritePendingStat.inc();
+        receivedRecordCounter.add(data.size());
+        BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+            getChecksum(ctx), featureChecksumDisabled, accessControlManager);
+        executeStreamOp(op);
+        return op.result().ensure(new Function0<BoxedUnit>() {
+            public BoxedUnit apply() {
+                bulkWritePendingStat.dec();
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public Future<WriteResponse> writeWithContext(final String stream, ByteBuffer data, WriteContext ctx) {
+        return doWrite(stream, data, getChecksum(ctx), ctx.isIsRecordSet());
+    }
+
+    @Override
+    public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
+        HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
+            featureChecksumDisabled, accessControlManager);
+        executeStreamOp(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<WriteResponse> heartbeatWithOptions(String stream, WriteContext ctx, HeartbeatOptions options) {
+        HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
+            featureChecksumDisabled, accessControlManager);
+        if (options.isSendHeartBeatToReader()) {
+            op.setWriteControlRecord(true);
+        }
+        executeStreamOp(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
+        TruncateOp op = new TruncateOp(
+            stream,
+            DLSN.deserialize(dlsn),
+            statsLogger,
+            perStreamStatsLogger,
+            getChecksum(ctx),
+            featureChecksumDisabled,
+            accessControlManager);
+        executeStreamOp(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<WriteResponse> delete(String stream, WriteContext ctx) {
+        DeleteOp op = new DeleteOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
+            featureChecksumDisabled, accessControlManager);
+        executeStreamOp(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<WriteResponse> release(String stream, WriteContext ctx) {
+        ReleaseOp op = new ReleaseOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
+            featureChecksumDisabled, accessControlManager);
+        executeStreamOp(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<WriteResponse> create(String stream, WriteContext ctx) {
+        CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled);
+        return executeStreamAdminOp(op);
+    }
+
+    //
+    // Ownership RPC
+    //
+
+    @Override
+    public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) {
+        if (streamManager.isAcquired(streamName)) {
+            // the stream is already acquired
+            return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
+        }
+
+        return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
+            @Override
+            public WriteResponse apply(String server) {
+                String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
+                return new WriteResponse(ResponseUtils.ownerToHeader(host));
+            }
+        });
+    }
+
+
+    //
+    // Admin RPCs
+    //
+
+    @Override
+    public Future<Void> setAcceptNewStream(boolean enabled) {
+        closeLock.writeLock().lock();
+        try {
+            logger.info("Set AcceptNewStream = {}", enabled);
+            if (ServerStatus.DOWN != serverStatus) {
+                if (enabled) {
+                    serverStatus = ServerStatus.WRITE_AND_ACCEPT;
+                } else {
+                    serverStatus = ServerStatus.WRITE_ONLY;
+                }
+            }
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+        return Future.Void();
+    }
+
+    private Future<WriteResponse> doWrite(final String name,
+                                          ByteBuffer data,
+                                          Long checksum,
+                                          boolean isRecordSet) {
+        writePendingStat.inc();
+        receivedRecordCounter.inc();
+        WriteOp op = newWriteOp(name, data, checksum, isRecordSet);
+        executeStreamOp(op);
+        return op.result().ensure(new Function0<BoxedUnit>() {
+            public BoxedUnit apply() {
+                writePendingStat.dec();
+                return null;
+            }
+        });
+    }
+
+    private Long getChecksum(WriteContext ctx) {
+        return ctx.isSetCrc32() ? ctx.getCrc32() : null;
+    }
+
+    private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) {
+        try {
+            op.preExecute();
+        } catch (DLException dle) {
+            return Future.exception(dle);
+        }
+        return op.execute();
+    }
+
+    private void executeStreamOp(final StreamOp op) {
+
+        // Must attach this as early as possible--returning before this point will cause us to
+        // lose the status code.
+        op.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() {
+            @Override
+            public void onSuccess(ResponseHeader header) {
+                if (header.getLocation() != null || header.getCode() == StatusCode.FOUND) {
+                    redirects.inc();
+                }
+                countStatusCode(header.getCode());
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+            }
+        });
+
+        try {
+            // Apply the request limiter
+            limiter.apply(op);
+
+            // Execute per-op pre-exec code
+            op.preExecute();
+
+        } catch (TooManyStreamsException e) {
+            // Translate to StreamUnavailableException to ensure that the client will redirect
+            // to a different host. Ideally we would be able to return TooManyStreamsException,
+            // but the way exception handling works right now we can't control the handling in
+            // the client because client changes deploy very slowly.
+            op.fail(new StreamUnavailableException(e.getMessage()));
+            return;
+        } catch (Exception e) {
+            op.fail(e);
+            return;
+        }
+
+        Stream stream;
+        try {
+            stream = getLogWriter(op.streamName());
+        } catch (RegionUnavailableException rue) {
+            // redirect the requests to other region
+            op.fail(new RegionUnavailableException("Region " + serverRegionId + " is unavailable."));
+            return;
+        } catch (IOException e) {
+            op.fail(e);
+            return;
+        }
+        if (null == stream) {
+            // redirect the requests when stream is unavailable.
+            op.fail(new ServiceUnavailableException("Server " + clientId + " is closed."));
+            return;
+        }
+
+        if (op instanceof WriteOpWithPayload) {
+            WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
+            windowedBps.add(writeOp.getPayloadSize());
+            windowedRps.inc();
+        }
+
+        stream.submit(op);
+    }
+
+    void shutdown() {
+        try {
+            closeLock.writeLock().lock();
+            try {
+                if (ServerStatus.DOWN == serverStatus) {
+                    return;
+                }
+                serverStatus = ServerStatus.DOWN;
+            } finally {
+                closeLock.writeLock().unlock();
+            }
+
+            streamManager.close();
+            movingAvgFactory.close();
+            limiter.close();
+
+            Stopwatch closeStreamsStopwatch = Stopwatch.createStarted();
+
+            Future<List<Void>> closeResult = streamManager.closeStreams();
+            logger.info("Waiting for closing all streams ...");
+            try {
+                Await.result(closeResult, Duration.fromTimeUnit(5, TimeUnit.MINUTES));
+                logger.info("Closed all streams in {} millis.",
+                        closeStreamsStopwatch.elapsed(TimeUnit.MILLISECONDS));
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted on waiting for closing all streams : ", e);
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e);
+            }
+
+            // shutdown the dl namespace
+            logger.info("Closing distributedlog namespace ...");
+            dlNamespace.close();
+            logger.info("Closed distributedlog namespace .");
+
+            // Stop the feature provider
+            if (this.featureProvider instanceof AbstractFeatureProvider) {
+                ((AbstractFeatureProvider) featureProvider).stop();
+            }
+
+            // Stop the timer.
+            timer.stop();
+            placementPolicy.close();
+
+            // clean up gauge
+            unregisterGauge();
+
+            // shutdown the executor after requesting closing streams.
+            SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS);
+        } catch (Exception ex) {
+            logger.info("Exception while shutting down distributedlog service.");
+        } finally {
+            // release the keepAliveLatch in case shutdown is called from a shutdown hook.
+            keepAliveLatch.countDown();
+            logger.info("Finished shutting down distributedlog service.");
+        }
+    }
+
+    protected void startPlacementPolicy() {
+        this.placementPolicy.start(shard == 0);
+    }
+
+    @Override
+    public void notifyFatalError() {
+        triggerShutdown();
+    }
+
+    private void triggerShutdown() {
+        // release the keepAliveLatch to let the main thread shutdown the whole service.
+        logger.info("Releasing KeepAlive Latch to trigger shutdown ...");
+        keepAliveLatch.countDown();
+        logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
+    }
+
+    // Test methods.
+
+    private DynamicDistributedLogConfiguration getDynConf(String streamName) {
+        Optional<DynamicDistributedLogConfiguration> dynDlConf =
+                streamConfigProvider.getDynamicStreamConfig(streamName);
+        if (dynDlConf.isPresent()) {
+            return dynDlConf.get();
+        } else {
+            return ConfUtils.getConstDynConf(dlConfig);
+        }
+    }
+
+    /**
+     * clean up the gauge before we close to help GC.
+     */
+    private void unregisterGauge(){
+        this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
+        this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
+        this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
+        this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
+        this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
+    }
+
+    @VisibleForTesting
+    Stream newStream(String name) throws IOException {
+        return streamManager.getOrCreateStream(name, false);
+    }
+
+    @VisibleForTesting
+    WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum) {
+        return newWriteOp(stream, data, checksum, false);
+    }
+
+    @VisibleForTesting
+    RoutingService getRoutingService() {
+        return this.routingService;
+    }
+
+    @VisibleForTesting
+    DLSocketAddress getServiceAddress() throws IOException {
+        return DLSocketAddress.deserialize(clientId);
+    }
+
+    WriteOp newWriteOp(String stream,
+                       ByteBuffer data,
+                       Long checksum,
+                       boolean isRecordSet) {
+        return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+            serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
+            accessControlManager);
+    }
+
+    @VisibleForTesting
+    Future<List<Void>> closeStreams() {
+        return streamManager.closeStreams();
+    }
+
+    @VisibleForTesting
+    public DistributedLogNamespace getDistributedLogNamespace() {
+        return dlNamespace;
+    }
+
+    @VisibleForTesting
+    StreamManager getStreamManager() {
+        return streamManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
new file mode 100644
index 0000000..17b5ab3
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+/**
+ * Implement handling for an unrecoverable error.
+ */
+public interface FatalErrorHandler {
+
+    /**
+     * This method is invoked when an unrecoverable error has occurred
+     * and no progress can be made. It should implement a shutdown routine.
+     */
+    void notifyFatalError();
+}