You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:16 UTC

[09/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
deleted file mode 100644
index 267f75a..0000000
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stats;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.health.HealthCheckRegistry;
-import com.codahale.metrics.servlets.AdminServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-/**
- * Starts a jetty server on a configurable port to export stats.
- */
-public class ServletReporter {
-
-    private final MetricRegistry metricRegistry;
-    private final HealthCheckRegistry healthCheckRegistry;
-    private final int port;
-    private final Server jettyServer;
-
-    public ServletReporter(MetricRegistry metricRegistry,
-                           HealthCheckRegistry healthCheckRegistry,
-                           int port) {
-        this.metricRegistry = metricRegistry;
-        this.healthCheckRegistry = healthCheckRegistry;
-        this.port = port;
-        this.jettyServer = new Server(port);
-    }
-
-    public void start() throws Exception {
-        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath("/");
-        jettyServer.setHandler(context);
-
-        context.addEventListener(new HealthCheckServletContextListener(healthCheckRegistry));
-        context.addEventListener(new MetricsServletContextListener(metricRegistry));
-        context.addServlet(new ServletHolder(new AdminServlet()), "/*");
-
-        jettyServer.start();
-    }
-
-    public void stop() throws Exception {
-        jettyServer.stop();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
deleted file mode 100644
index 5bdb3ce..0000000
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}.
- */
-package org.apache.bookkeeper.stats;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
deleted file mode 100644
index 96bc338..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * DistributedLog Client Related Utils.
- */
-public class ClientUtils {
-
-    public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {
-        DistributedLogClientImpl clientImpl = builder.buildClient();
-        return Pair.of((DistributedLogClient) clientImpl, (MonitorServiceClient) clientImpl);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
deleted file mode 100644
index 9cc085d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.distributedlog.client.routing.SingleHostRoutingService;
-import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.metadata.DLMetadata;
-import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import com.twitter.finagle.builder.Server;
-import java.io.File;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DistributedLog Cluster is an emulator to run distributedlog components.
- */
-public class DistributedLogCluster {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder to build distributedlog cluster.
-     */
-    public static class Builder {
-
-        int numBookies = 3;
-        boolean shouldStartZK = true;
-        String zkHost = "127.0.0.1";
-        int zkPort = 0;
-        boolean shouldStartProxy = true;
-        int proxyPort = 7000;
-        boolean thriftmux = false;
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
-                .setLockTimeout(10)
-                .setOutputBufferSize(0)
-                .setImmediateFlushEnabled(true);
-        ServerConfiguration bkConf = new ServerConfiguration();
-
-        private Builder() {}
-
-        /**
-         * How many bookies to run. By default is 3.
-         *
-         * @return builder
-         */
-        public Builder numBookies(int numBookies) {
-            this.numBookies = numBookies;
-            return this;
-        }
-
-        /**
-         * Whether to start zookeeper? By default is true.
-         *
-         * @param startZK
-         *          flag to start zookeeper?
-         * @return builder
-         */
-        public Builder shouldStartZK(boolean startZK) {
-            this.shouldStartZK = startZK;
-            return this;
-        }
-
-        /**
-         * ZooKeeper server to run. By default it runs locally on '127.0.0.1'.
-         *
-         * @param zkServers
-         *          zk servers
-         * @return builder
-         */
-        public Builder zkServers(String zkServers) {
-            this.zkHost = zkServers;
-            return this;
-        }
-
-        /**
-         * ZooKeeper server port to listen on. By default it listens on 2181.
-         *
-         * @param zkPort
-         *          zookeeper server port.
-         * @return builder.
-         */
-        public Builder zkPort(int zkPort) {
-            this.zkPort = zkPort;
-            return this;
-        }
-
-        /**
-         * Whether to start proxy or not. By default is true.
-         *
-         * @param startProxy
-         *          whether to start proxy or not.
-         * @return builder
-         */
-        public Builder shouldStartProxy(boolean startProxy) {
-            this.shouldStartProxy = startProxy;
-            return this;
-        }
-
-        /**
-         * Port that proxy server to listen on. By default is 7000.
-         *
-         * @param proxyPort
-         *          port that proxy server to listen on.
-         * @return builder
-         */
-        public Builder proxyPort(int proxyPort) {
-            this.proxyPort = proxyPort;
-            return this;
-        }
-
-        /**
-         * Set the distributedlog configuration.
-         *
-         * @param dlConf
-         *          distributedlog configuration
-         * @return builder
-         */
-        public Builder dlConf(DistributedLogConfiguration dlConf) {
-            this.dlConf = dlConf;
-            return this;
-        }
-
-        /**
-         * Set the Bookkeeper server configuration.
-         *
-         * @param bkConf
-         *          bookkeeper server configuration
-         * @return builder
-         */
-        public Builder bkConf(ServerConfiguration bkConf) {
-            this.bkConf = bkConf;
-            return this;
-        }
-
-        /**
-         * Enable thriftmux for the dl server.
-         *
-         * @param enabled flag to enable thriftmux
-         * @return builder
-         */
-        public Builder thriftmux(boolean enabled) {
-            this.thriftmux = enabled;
-            return this;
-        }
-
-        public DistributedLogCluster build() throws Exception {
-            // build the cluster
-            return new DistributedLogCluster(
-                dlConf,
-                bkConf,
-                numBookies,
-                shouldStartZK,
-                zkHost,
-                zkPort,
-                shouldStartProxy,
-                proxyPort,
-                thriftmux);
-        }
-    }
-
-    /**
-     * Run a distributedlog proxy server.
-     */
-    public static class DLServer {
-
-        static final int MAX_RETRIES = 20;
-        static final int MIN_PORT = 1025;
-        static final int MAX_PORT = 65535;
-
-        int proxyPort;
-
-        public final InetSocketAddress address;
-        public final Pair<DistributedLogServiceImpl, Server> dlServer;
-        private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);
-
-        protected DLServer(DistributedLogConfiguration dlConf,
-                           URI uri,
-                           int basePort,
-                           boolean thriftmux) throws Exception {
-            proxyPort = basePort;
-
-            boolean success = false;
-            int retries = 0;
-            Pair<DistributedLogServiceImpl, Server> serverPair = null;
-            while (!success) {
-                try {
-                    org.apache.distributedlog.service.config.ServerConfiguration serverConf =
-                            new org.apache.distributedlog.service.config.ServerConfiguration();
-                    serverConf.loadConf(dlConf);
-                    serverConf.setServerShardId(proxyPort);
-                    serverPair = DistributedLogServer.runServer(
-                            serverConf,
-                            dlConf,
-                            uri,
-                            new IdentityStreamPartitionConverter(),
-                            routingService,
-                            new NullStatsProvider(),
-                            proxyPort,
-                            thriftmux,
-                            new EqualLoadAppraiser());
-                    routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
-                    routingService.startService();
-                    serverPair.getLeft().startPlacementPolicy();
-                    success = true;
-                } catch (BindException be) {
-                    retries++;
-                    if (retries > MAX_RETRIES) {
-                        throw be;
-                    }
-                    proxyPort++;
-                    if (proxyPort > MAX_PORT) {
-                        proxyPort = MIN_PORT;
-                    }
-                }
-            }
-
-            LOG.info("Running DL on port {}", proxyPort);
-
-            dlServer = serverPair;
-            address = DLSocketAddress.getSocketAddress(proxyPort);
-        }
-
-        public InetSocketAddress getAddress() {
-            return address;
-        }
-
-        public void shutdown() {
-            DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
-            routingService.stopService();
-        }
-    }
-
-    private final DistributedLogConfiguration dlConf;
-    private final ZooKeeperServerShim zks;
-    private final LocalDLMEmulator dlmEmulator;
-    private DLServer dlServer;
-    private final boolean shouldStartProxy;
-    private final int proxyPort;
-    private final boolean thriftmux;
-    private final List<File> tmpDirs = new ArrayList<File>();
-
-    private DistributedLogCluster(DistributedLogConfiguration dlConf,
-                                  ServerConfiguration bkConf,
-                                  int numBookies,
-                                  boolean shouldStartZK,
-                                  String zkServers,
-                                  int zkPort,
-                                  boolean shouldStartProxy,
-                                  int proxyPort,
-                                  boolean thriftmux) throws Exception {
-        this.dlConf = dlConf;
-        if (shouldStartZK) {
-            File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
-            tmpDirs.add(zkTmpDir);
-            if (0 == zkPort) {
-                Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
-                this.zks = serverAndPort.getLeft();
-                zkPort = serverAndPort.getRight();
-            } else {
-                this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
-            }
-        } else {
-            this.zks = null;
-        }
-        this.dlmEmulator = LocalDLMEmulator.newBuilder()
-                .numBookies(numBookies)
-                .zkHost(zkServers)
-                .zkPort(zkPort)
-                .serverConf(bkConf)
-                .shouldStartZK(false)
-                .build();
-        this.shouldStartProxy = shouldStartProxy;
-        this.proxyPort = proxyPort;
-        this.thriftmux = thriftmux;
-    }
-
-    public void start() throws Exception {
-        this.dlmEmulator.start();
-        BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
-        DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
-        if (shouldStartProxy) {
-            this.dlServer = new DLServer(
-                    dlConf,
-                    this.dlmEmulator.getUri(),
-                    proxyPort,
-                    thriftmux);
-        } else {
-            this.dlServer = null;
-        }
-    }
-
-    public void stop() throws Exception {
-        if (null != dlServer) {
-            this.dlServer.shutdown();
-        }
-        this.dlmEmulator.teardown();
-        if (null != this.zks) {
-            this.zks.stop();
-        }
-        for (File dir : tmpDirs) {
-            FileUtils.forceDeleteOnExit(dir);
-        }
-    }
-
-    public URI getUri() {
-        return this.dlmEmulator.getUri();
-    }
-
-    public String getZkServers() {
-        return this.dlmEmulator.getZkServers();
-    }
-
-    public String getProxyFinagleStr() {
-        return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
deleted file mode 100644
index 81e476b..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.config.DynamicConfigurationFactory;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.service.announcer.Announcer;
-import org.apache.distributedlog.service.announcer.NOPAnnouncer;
-import org.apache.distributedlog.service.announcer.ServerSetAnnouncer;
-import org.apache.distributedlog.service.config.DefaultStreamConfigProvider;
-import org.apache.distributedlog.service.config.NullStreamConfigProvider;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.ServiceStreamConfigProvider;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
-import org.apache.distributedlog.service.placement.LoadAppraiser;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.Stack;
-import com.twitter.finagle.ThriftMuxServer$;
-import com.twitter.finagle.builder.Server;
-import com.twitter.finagle.builder.ServerBuilder;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientIdRequiredFilter;
-import com.twitter.finagle.thrift.ThriftServerFramedCodec;
-import com.twitter.finagle.transport.Transport;
-import com.twitter.util.Duration;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-
-/**
- * Running the distributedlog proxy server.
- */
-public class DistributedLogServer {
-
-    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
-    private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
-
-    private DistributedLogServiceImpl dlService = null;
-    private Server server = null;
-    private RoutingService routingService;
-    private StatsProvider statsProvider;
-    private Announcer announcer = null;
-    private ScheduledExecutorService configExecutorService;
-    private long gracefulShutdownMs = 0L;
-
-    private final StatsReceiver statsReceiver;
-    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
-    private final Optional<String> uri;
-    private final Optional<String> conf;
-    private final Optional<String> streamConf;
-    private final Optional<Integer> port;
-    private final Optional<Integer> statsPort;
-    private final Optional<Integer> shardId;
-    private final Optional<Boolean> announceServerSet;
-    private final Optional<String> loadAppraiserClassStr;
-    private final Optional<Boolean> thriftmux;
-
-    DistributedLogServer(Optional<String> uri,
-                         Optional<String> conf,
-                         Optional<String> streamConf,
-                         Optional<Integer> port,
-                         Optional<Integer> statsPort,
-                         Optional<Integer> shardId,
-                         Optional<Boolean> announceServerSet,
-                         Optional<String> loadAppraiserClass,
-                         Optional<Boolean> thriftmux,
-                         RoutingService routingService,
-                         StatsReceiver statsReceiver,
-                         StatsProvider statsProvider) {
-        this.uri = uri;
-        this.conf = conf;
-        this.streamConf = streamConf;
-        this.port = port;
-        this.statsPort = statsPort;
-        this.shardId = shardId;
-        this.announceServerSet = announceServerSet;
-        this.thriftmux = thriftmux;
-        this.routingService = routingService;
-        this.statsReceiver = statsReceiver;
-        this.statsProvider = statsProvider;
-        this.loadAppraiserClassStr = loadAppraiserClass;
-    }
-
-    public void runServer()
-        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
-        if (!uri.isPresent()) {
-            throw new IllegalArgumentException("No distributedlog uri provided.");
-        }
-        URI dlUri = URI.create(uri.get());
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
-        if (conf.isPresent()) {
-            String configFile = conf.get();
-            try {
-                dlConf.loadConf(new File(configFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new IllegalArgumentException("Failed to load distributedlog configuration from "
-                    + configFile + ".");
-            } catch (MalformedURLException e) {
-                throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
-                        + configFile + ".");
-            }
-        }
-
-        this.configExecutorService = Executors.newScheduledThreadPool(1,
-                new ThreadFactoryBuilder()
-                        .setNameFormat("DistributedLogService-Dyncfg-%d")
-                        .setDaemon(true)
-                        .build());
-
-        // server configuration and dynamic configuration
-        ServerConfiguration serverConf = new ServerConfiguration();
-        serverConf.loadConf(dlConf);
-
-        // overwrite the shard id if it is provided in the args
-        if (shardId.isPresent()) {
-            serverConf.setServerShardId(shardId.get());
-        }
-
-        serverConf.validate();
-
-        DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf);
-
-        logger.info("Starting stats provider : {}", statsProvider.getClass());
-        statsProvider.start(dlConf);
-
-        if (announceServerSet.isPresent() && announceServerSet.get()) {
-            announcer = new ServerSetAnnouncer(
-                    dlUri,
-                    port.or(0),
-                    statsPort.or(0),
-                    shardId.or(0));
-        } else {
-            announcer = new NOPAnnouncer();
-        }
-
-        // Build the stream partition converter
-        StreamPartitionConverter converter;
-        try {
-            converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
-        } catch (ConfigurationException e) {
-            logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
-                    IdentityStreamPartitionConverter.class.getName());
-            converter = new IdentityStreamPartitionConverter();
-        }
-        Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
-        LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
-        logger.info("Load appraiser class is " + loadAppraiserClassStr.or("not specified.") + " Instantiated "
-                + loadAppraiser.getClass().getCanonicalName());
-
-        StreamConfigProvider streamConfProvider =
-                getStreamConfigProvider(dlConf, converter);
-
-        // pre-run
-        preRun(dlConf, serverConf);
-
-        Pair<DistributedLogServiceImpl, Server> serverPair = runServer(
-                serverConf,
-                dlConf,
-                dynDlConf,
-                dlUri,
-                converter,
-                routingService,
-                statsProvider,
-                port.or(0),
-                keepAliveLatch,
-                statsReceiver,
-                thriftmux.isPresent(),
-                streamConfProvider,
-                loadAppraiser);
-
-        this.dlService = serverPair.getLeft();
-        this.server = serverPair.getRight();
-
-        // announce the service
-        announcer.announce();
-        // start the routing service after announced
-        routingService.startService();
-        logger.info("Started the routing service.");
-        dlService.startPlacementPolicy();
-        logger.info("Started the placement policy.");
-    }
-
-    protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
-        this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs();
-        if (!serverConf.isDurableWriteEnabled()) {
-            conf.setDurableWriteEnabled(false);
-        }
-    }
-
-    private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
-        throws ConfigurationException {
-        Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
-        if (conf.isPresent()) {
-            DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
-                    configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
-            dynConf = configFactory.getDynamicConfiguration(conf.get());
-        }
-        if (dynConf.isPresent()) {
-            return dynConf.get();
-        } else {
-            return ConfUtils.getConstDynConf(dlConf);
-        }
-    }
-
-    private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf,
-                                                         StreamPartitionConverter partitionConverter)
-            throws ConfigurationException {
-        StreamConfigProvider streamConfProvider = new NullStreamConfigProvider();
-        if (streamConf.isPresent() && conf.isPresent()) {
-            String dynConfigPath = streamConf.get();
-            String defaultConfigFile = conf.get();
-            streamConfProvider = new ServiceStreamConfigProvider(
-                    dynConfigPath,
-                    defaultConfigFile,
-                    partitionConverter,
-                    configExecutorService,
-                    dlConf.getDynamicConfigReloadIntervalSec(),
-                    TimeUnit.SECONDS);
-        } else if (conf.isPresent()) {
-            String configFile = conf.get();
-            streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService,
-                    dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
-        }
-        return streamConfProvider;
-    }
-
-    static Pair<DistributedLogServiceImpl, Server> runServer(
-            ServerConfiguration serverConf,
-            DistributedLogConfiguration dlConf,
-            URI dlUri,
-            StreamPartitionConverter converter,
-            RoutingService routingService,
-            StatsProvider provider,
-            int port,
-            boolean thriftmux,
-            LoadAppraiser loadAppraiser) throws IOException {
-
-        return runServer(serverConf,
-                dlConf,
-                ConfUtils.getConstDynConf(dlConf),
-                dlUri,
-                converter,
-                routingService,
-                provider,
-                port,
-                new CountDownLatch(0),
-                new NullStatsReceiver(),
-                thriftmux,
-                new NullStreamConfigProvider(),
-                loadAppraiser);
-    }
-
-    static Pair<DistributedLogServiceImpl, Server> runServer(
-            ServerConfiguration serverConf,
-            DistributedLogConfiguration dlConf,
-            DynamicDistributedLogConfiguration dynDlConf,
-            URI dlUri,
-            StreamPartitionConverter partitionConverter,
-            RoutingService routingService,
-            StatsProvider provider,
-            int port,
-            CountDownLatch keepAliveLatch,
-            StatsReceiver statsReceiver,
-            boolean thriftmux,
-            StreamConfigProvider streamConfProvider,
-            LoadAppraiser loadAppraiser) throws IOException {
-        logger.info("Running server @ uri {}.", dlUri);
-
-        boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
-        StatsLogger perStreamStatsLogger;
-        if (perStreamStatsEnabled) {
-            perStreamStatsLogger = provider.getStatsLogger("stream");
-        } else {
-            perStreamStatsLogger = NullStatsLogger.INSTANCE;
-        }
-
-        // dl service
-        DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
-            serverConf,
-            dlConf,
-            dynDlConf,
-            streamConfProvider,
-            dlUri,
-            partitionConverter,
-            routingService,
-            provider.getStatsLogger(""),
-            perStreamStatsLogger,
-            keepAliveLatch,
-            loadAppraiser);
-
-        StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
-        StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
-
-        ServerBuilder serverBuilder = ServerBuilder.get()
-                .name("DistributedLogServer")
-                .codec(ThriftServerFramedCodec.get())
-                .reportTo(statsReceiver)
-                .keepAlive(true)
-                .bindTo(new InetSocketAddress(port));
-
-        if (thriftmux) {
-            logger.info("Using thriftmux.");
-            Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
-                    Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
-            serverBuilder = serverBuilder.stack(
-                ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
-        }
-
-        logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
-
-        // starts dl server
-        Server server = ServerBuilder.safeBuild(
-                new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen(
-                    new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen(
-                        new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))),
-                serverBuilder);
-
-        logger.info("Started DistributedLog Server.");
-        return Pair.of(dlService, server);
-    }
-
-    static void closeServer(Pair<DistributedLogServiceImpl, Server> pair,
-                            long gracefulShutdownPeriod,
-                            TimeUnit timeUnit) {
-        if (null != pair.getLeft()) {
-            pair.getLeft().shutdown();
-            if (gracefulShutdownPeriod > 0) {
-                try {
-                    timeUnit.sleep(gracefulShutdownPeriod);
-                } catch (InterruptedException e) {
-                    logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e);
-                }
-            }
-        }
-        if (null != pair.getRight()) {
-            logger.info("Closing dl thrift server.");
-            pair.getRight().close();
-            logger.info("Closed dl thrift server.");
-        }
-    }
-
-    /**
-     * Close the server.
-     */
-    public void close() {
-        if (null != announcer) {
-            try {
-                announcer.unannounce();
-            } catch (IOException e) {
-                logger.warn("Error on unannouncing service : ", e);
-            }
-            announcer.close();
-        }
-        closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS);
-        routingService.stopService();
-        if (null != statsProvider) {
-            statsProvider.stop();
-        }
-        SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS);
-        keepAliveLatch.countDown();
-    }
-
-    public void join() throws InterruptedException {
-        keepAliveLatch.await();
-    }
-
-    /**
-     * Running distributedlog server.
-     *
-     * @param uri distributedlog namespace
-     * @param conf distributedlog configuration file location
-     * @param streamConf per stream configuration dir location
-     * @param port listen port
-     * @param statsPort stats port
-     * @param shardId shard id
-     * @param announceServerSet whether to announce itself to server set
-     * @param thriftmux flag to enable thrift mux
-     * @param statsReceiver receiver to receive finagle stats
-     * @param statsProvider provider to receive dl stats
-     * @return distributedlog server
-     * @throws ConfigurationException
-     * @throws IllegalArgumentException
-     * @throws IOException
-     * @throws ClassNotFoundException
-     */
-    public static DistributedLogServer runServer(
-               Optional<String> uri,
-               Optional<String> conf,
-               Optional<String> streamConf,
-               Optional<Integer> port,
-               Optional<Integer> statsPort,
-               Optional<Integer> shardId,
-               Optional<Boolean> announceServerSet,
-               Optional<String> loadAppraiserClass,
-               Optional<Boolean> thriftmux,
-               RoutingService routingService,
-               StatsReceiver statsReceiver,
-               StatsProvider statsProvider)
-        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
-
-        final DistributedLogServer server = new DistributedLogServer(
-                uri,
-                conf,
-                streamConf,
-                port,
-                statsPort,
-                shardId,
-                announceServerSet,
-                loadAppraiserClass,
-                thriftmux,
-                routingService,
-                statsReceiver,
-                statsProvider);
-
-        server.runServer();
-        return server;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
deleted file mode 100644
index a1642f9..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.client.routing.RoutingUtils;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The launcher of the distributedlog proxy server.
- */
-public class DistributedLogServerApp {
-
-    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
-
-    private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
-    private final String[] args;
-    private final Options options = new Options();
-
-    private DistributedLogServerApp(String[] args) {
-        this.args = args;
-
-        // prepare options
-        options.addOption("u", "uri", true, "DistributedLog URI");
-        options.addOption("c", "conf", true, "DistributedLog Configuration File");
-        options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory");
-        options.addOption("p", "port", true, "DistributedLog Server Port");
-        options.addOption("sp", "stats-port", true, "DistributedLog Stats Port");
-        options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
-        options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
-        options.addOption("a", "announce", false, "ServerSet Path to Announce");
-        options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
-        options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
-    }
-
-    private void printUsage() {
-        HelpFormatter helpFormatter = new HelpFormatter();
-        helpFormatter.printHelp(USAGE, options);
-    }
-
-    private void run() {
-        try {
-            logger.info("Running distributedlog server : args = {}", Arrays.toString(args));
-            BasicParser parser = new BasicParser();
-            CommandLine cmdline = parser.parse(options, args);
-            runCmd(cmdline);
-        } catch (ParseException pe) {
-            logger.error("Argument error : {}", pe.getMessage());
-            printUsage();
-            Runtime.getRuntime().exit(-1);
-        } catch (IllegalArgumentException iae) {
-            logger.error("Argument error : {}", iae.getMessage());
-            printUsage();
-            Runtime.getRuntime().exit(-1);
-        } catch (ConfigurationException ce) {
-            logger.error("Configuration error : {}", ce.getMessage());
-            printUsage();
-            Runtime.getRuntime().exit(-1);
-        } catch (IOException ie) {
-            logger.error("Failed to start distributedlog server : ", ie);
-            Runtime.getRuntime().exit(-1);
-        } catch (ClassNotFoundException cnf) {
-          logger.error("Failed to start distributedlog server : ", cnf);
-          Runtime.getRuntime().exit(-1);
-        }
-    }
-
-    private void runCmd(CommandLine cmdline)
-        throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
-        final StatsReceiver statsReceiver = NullStatsReceiver.get();
-        Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
-        if (confOptional.isPresent()) {
-            String configFile = confOptional.get();
-            try {
-                dlConf.loadConf(new File(configFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new IllegalArgumentException("Failed to load distributedlog configuration from "
-                    + configFile + ".");
-            } catch (MalformedURLException e) {
-                throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
-                        + configFile + ".");
-            }
-        }
-        // load the stats provider
-        final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd")
-                .transform(new Function<String, StatsProvider>() {
-                    @Nullable
-                    @Override
-                    public StatsProvider apply(@Nullable String name) {
-                        return ReflectionUtils.newInstance(name, StatsProvider.class);
-                    }
-                }).or(new NullStatsProvider());
-
-        final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
-        checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
-        URI dlUri = URI.create(uriOption.get());
-
-        DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
-        RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet())
-                .statsReceiver(statsReceiver.scope("routing"))
-                .build();
-
-        final DistributedLogServer server = DistributedLogServer.runServer(
-                uriOption,
-                confOptional,
-                getOptionalStringArg(cmdline, "sc"),
-                getOptionalIntegerArg(cmdline, "p"),
-                getOptionalIntegerArg(cmdline, "sp"),
-                getOptionalIntegerArg(cmdline, "si"),
-                getOptionalBooleanArg(cmdline, "a"),
-                getOptionalStringArg(cmdline, "la"),
-                getOptionalBooleanArg(cmdline, "mx"),
-                routingService,
-                statsReceiver,
-                statsProvider);
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                logger.info("Closing DistributedLog Server.");
-                server.close();
-                logger.info("Closed DistributedLog Server.");
-                statsProvider.stop();
-            }
-        });
-
-        try {
-            server.join();
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted when waiting distributedlog server to be finished : ", e);
-        }
-
-        logger.info("DistributedLog Service Interrupted.");
-        server.close();
-        logger.info("Closed DistributedLog Server.");
-        statsProvider.stop();
-    }
-
-    public static void main(String[] args) {
-        final DistributedLogServerApp launcher = new DistributedLogServerApp(args);
-        launcher.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
deleted file mode 100644
index c37cd53..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
+++ /dev/null
@@ -1,794 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.net.InetSocketAddressHelper;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RegionUnavailableException;
-import org.apache.distributedlog.exceptions.ServiceUnavailableException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.exceptions.TooManyStreamsException;
-import org.apache.distributedlog.feature.AbstractFeatureProvider;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.rate.MovingAverageRate;
-import org.apache.distributedlog.rate.MovingAverageRateFactory;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy;
-import org.apache.distributedlog.service.placement.LoadAppraiser;
-import org.apache.distributedlog.service.placement.PlacementPolicy;
-import org.apache.distributedlog.service.placement.ZKPlacementStateManager;
-import org.apache.distributedlog.service.stream.BulkWriteOp;
-import org.apache.distributedlog.service.stream.DeleteOp;
-import org.apache.distributedlog.service.stream.HeartbeatOp;
-import org.apache.distributedlog.service.stream.ReleaseOp;
-import org.apache.distributedlog.service.stream.Stream;
-import org.apache.distributedlog.service.stream.StreamFactory;
-import org.apache.distributedlog.service.stream.StreamFactoryImpl;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import org.apache.distributedlog.service.stream.StreamOp;
-import org.apache.distributedlog.service.stream.StreamOpStats;
-import org.apache.distributedlog.service.stream.TruncateOp;
-import org.apache.distributedlog.service.stream.WriteOp;
-import org.apache.distributedlog.service.stream.WriteOpWithPayload;
-import org.apache.distributedlog.service.stream.admin.CreateOp;
-import org.apache.distributedlog.service.stream.admin.StreamAdminOp;
-import org.apache.distributedlog.service.stream.limiter.ServiceRequestLimiter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.service.utils.ServerUtils;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ClientInfo;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import org.apache.distributedlog.thrift.service.ServerStatus;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Timer;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * Implementation of distributedlog thrift service.
- */
-public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
-                                                  FatalErrorHandler {
-
-    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
-
-    private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
-
-    private final ServerConfiguration serverConfig;
-    private final DistributedLogConfiguration dlConfig;
-    private final DistributedLogNamespace dlNamespace;
-    private final int serverRegionId;
-    private final PlacementPolicy placementPolicy;
-    private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
-    private final ReentrantReadWriteLock closeLock =
-            new ReentrantReadWriteLock();
-    private final CountDownLatch keepAliveLatch;
-    private final byte dlsnVersion;
-    private final String clientId;
-    private final OrderedScheduler scheduler;
-    private final AccessControlManager accessControlManager;
-    private final StreamConfigProvider streamConfigProvider;
-    private final StreamManager streamManager;
-    private final StreamFactory streamFactory;
-    private final RoutingService routingService;
-    private final RegionResolver regionResolver;
-    private final MovingAverageRateFactory movingAvgFactory;
-    private final MovingAverageRate windowedRps;
-    private final MovingAverageRate windowedBps;
-    private final ServiceRequestLimiter limiter;
-    private final Timer timer;
-    private final HashedWheelTimer requestTimer;
-
-    // Features
-    private final FeatureProvider featureProvider;
-    private final Feature featureRegionStopAcceptNewStream;
-    private final Feature featureChecksumDisabled;
-    private final Feature limiterDisabledFeature;
-
-    // Stats
-    private final StatsLogger statsLogger;
-    private final StatsLogger perStreamStatsLogger;
-    private final StreamPartitionConverter streamPartitionConverter;
-    private final StreamOpStats streamOpStats;
-    private final Counter bulkWritePendingStat;
-    private final Counter writePendingStat;
-    private final Counter redirects;
-    private final Counter receivedRecordCounter;
-    private final StatsLogger statusCodeStatLogger;
-    private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters =
-            new ConcurrentHashMap<StatusCode, Counter>();
-    private final Counter statusCodeTotal;
-    private final Gauge<Number> proxyStatusGauge;
-    private final Gauge<Number> movingAvgRpsGauge;
-    private final Gauge<Number> movingAvgBpsGauge;
-    private final Gauge<Number> streamAcquiredGauge;
-    private final Gauge<Number> streamCachedGauge;
-    private final int shard;
-
-    DistributedLogServiceImpl(ServerConfiguration serverConf,
-                              DistributedLogConfiguration dlConf,
-                              DynamicDistributedLogConfiguration dynDlConf,
-                              StreamConfigProvider streamConfigProvider,
-                              URI uri,
-                              StreamPartitionConverter converter,
-                              RoutingService routingService,
-                              StatsLogger statsLogger,
-                              StatsLogger perStreamStatsLogger,
-                              CountDownLatch keepAliveLatch,
-                              LoadAppraiser loadAppraiser)
-            throws IOException {
-        // Configuration.
-        this.serverConfig = serverConf;
-        this.dlConfig = dlConf;
-        this.perStreamStatsLogger = perStreamStatsLogger;
-        this.dlsnVersion = serverConf.getDlsnVersion();
-        this.serverRegionId = serverConf.getRegionId();
-        this.streamPartitionConverter = converter;
-        int serverPort = serverConf.getServerPort();
-        this.shard = serverConf.getServerShardId();
-        int numThreads = serverConf.getServerThreads();
-        this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
-        String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
-            serverRegionId,
-            shard,
-            serverConf.isUseHostnameAsAllocatorPoolName());
-        dlConf.setLedgerAllocatorPoolName(allocatorPoolName);
-        this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", dlConf, statsLogger.scope("features"));
-        if (this.featureProvider instanceof AbstractFeatureProvider) {
-            ((AbstractFeatureProvider) featureProvider).start();
-        }
-
-        // Build the namespace
-        this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(dlConf)
-                .uri(uri)
-                .statsLogger(statsLogger)
-                .featureProvider(this.featureProvider)
-                .clientId(clientId)
-                .regionId(serverRegionId)
-                .build();
-        this.accessControlManager = this.dlNamespace.createAccessControlManager();
-        this.keepAliveLatch = keepAliveLatch;
-        this.streamConfigProvider = streamConfigProvider;
-
-        // Stats pertaining to stream op execution
-        this.streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
-
-        // Executor Service.
-        this.scheduler = OrderedScheduler.newBuilder()
-                .corePoolSize(numThreads)
-                .name("DistributedLogService-Executor")
-                .traceTaskExecution(true)
-                .statsLogger(statsLogger.scope("scheduler"))
-                .build();
-
-        // Timer, kept separate to ensure reliability of timeouts.
-        this.requestTimer = new HashedWheelTimer(
-            new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(),
-            dlConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
-            dlConf.getTimeoutTimerNumTicks());
-
-        // Creating and managing Streams
-        this.streamFactory = new StreamFactoryImpl(clientId,
-                streamOpStats,
-                serverConf,
-                dlConf,
-                featureProvider,
-                streamConfigProvider,
-                converter,
-                dlNamespace,
-                scheduler,
-                this,
-                requestTimer);
-        this.streamManager = new StreamManagerImpl(
-                clientId,
-                dlConf,
-                scheduler,
-                streamFactory,
-                converter,
-                streamConfigProvider,
-                dlNamespace);
-        this.routingService = routingService;
-        this.regionResolver = new DefaultRegionResolver();
-
-        // Service features
-        this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(
-                ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase());
-        this.featureChecksumDisabled = this.featureProvider.getFeature(
-                ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase());
-        this.limiterDisabledFeature = this.featureProvider.getFeature(
-                ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase());
-
-        // Resource limiting
-        this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
-        this.movingAvgFactory = new MovingAverageRateFactory(timer);
-        this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
-        this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
-        this.limiter = new ServiceRequestLimiter(
-                dynDlConf,
-                streamOpStats.baseScope("service_limiter"),
-                windowedRps,
-                windowedBps,
-                streamManager,
-                limiterDisabledFeature);
-
-        this.placementPolicy = new LeastLoadPlacementPolicy(
-            loadAppraiser,
-            routingService,
-            dlNamespace,
-            new ZKPlacementStateManager(uri, dlConf, statsLogger),
-            Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
-            statsLogger);
-        logger.info("placement started");
-
-        // Stats
-        this.statsLogger = statsLogger;
-
-        // Gauges for server status/health
-        this.proxyStatusGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
-                    ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
-            }
-        };
-        this.movingAvgRpsGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return windowedRps.get();
-            }
-        };
-        this.movingAvgBpsGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return windowedBps.get();
-            }
-        };
-        // Gauges for streams
-        this.streamAcquiredGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return streamManager.numAcquired();
-            }
-        };
-        this.streamCachedGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return streamManager.numCached();
-            }
-        };
-
-        // Stats on server
-        statsLogger.registerGauge("proxy_status", proxyStatusGauge);
-        // Global moving average rps
-        statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge);
-        // Global moving average bps
-        statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge);
-        // Stats on requests
-        this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending");
-        this.writePendingStat = streamOpStats.requestPendingCounter("writePending");
-        this.redirects = streamOpStats.requestCounter("redirect");
-        this.statusCodeStatLogger = streamOpStats.requestScope("statuscode");
-        this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count");
-        this.receivedRecordCounter = streamOpStats.recordsCounter("received");
-
-        // Stats for streams
-        StatsLogger streamsStatsLogger = statsLogger.scope("streams");
-        streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge);
-        streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
-
-        // Setup complete
-        logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
-            + " dlsn version {}.",
-            new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
-    }
-
-    private void countStatusCode(StatusCode code) {
-        Counter counter = statusCodeCounters.get(code);
-        if (null == counter) {
-            counter = statusCodeStatLogger.getCounter(code.name());
-            Counter oldCounter = statusCodeCounters.putIfAbsent(code, counter);
-            if (null != oldCounter) {
-                counter = oldCounter;
-            }
-        }
-        counter.inc();
-        statusCodeTotal.inc();
-    }
-
-    @Override
-    public Future<ServerInfo> handshake() {
-        return handshakeWithClientInfo(new ClientInfo());
-    }
-
-    @Override
-    public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
-        ServerInfo serverInfo = new ServerInfo();
-        closeLock.readLock().lock();
-        try {
-            serverInfo.setServerStatus(serverStatus);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-
-        if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) {
-            return Future.value(serverInfo);
-        }
-
-        Optional<String> regex = Optional.absent();
-        if (clientInfo.isSetStreamNameRegex()) {
-            regex = Optional.of(clientInfo.getStreamNameRegex());
-        }
-
-        Map<String, String> ownershipMap = streamManager.getStreamOwnershipMap(regex);
-        serverInfo.setOwnerships(ownershipMap);
-        return Future.value(serverInfo);
-    }
-
-    @VisibleForTesting
-    Stream getLogWriter(String stream) throws IOException {
-        Stream writer = streamManager.getStream(stream);
-        if (null == writer) {
-            closeLock.readLock().lock();
-            try {
-                if (featureRegionStopAcceptNewStream.isAvailable()) {
-                    // accept new stream is disabled in current dc
-                    throw new RegionUnavailableException("Region is unavailable right now.");
-                } else if (!(ServerStatus.WRITE_AND_ACCEPT == serverStatus)) {
-                    // if it is closed, we would not acquire stream again.
-                    return null;
-                }
-                writer = streamManager.getOrCreateStream(stream, true);
-            } finally {
-                closeLock.readLock().unlock();
-            }
-        }
-        return writer;
-    }
-
-    // Service interface methods
-
-    @Override
-    public Future<WriteResponse> write(final String stream, ByteBuffer data) {
-        receivedRecordCounter.inc();
-        return doWrite(stream, data, null /* checksum */, false);
-    }
-
-    @Override
-    public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
-                                                          List<ByteBuffer> data,
-                                                          WriteContext ctx) {
-        bulkWritePendingStat.inc();
-        receivedRecordCounter.add(data.size());
-        BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
-            getChecksum(ctx), featureChecksumDisabled, accessControlManager);
-        executeStreamOp(op);
-        return op.result().ensure(new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                bulkWritePendingStat.dec();
-                return null;
-            }
-        });
-    }
-
-    @Override
-    public Future<WriteResponse> writeWithContext(final String stream, ByteBuffer data, WriteContext ctx) {
-        return doWrite(stream, data, getChecksum(ctx), ctx.isIsRecordSet());
-    }
-
-    @Override
-    public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
-        HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
-        executeStreamOp(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<WriteResponse> heartbeatWithOptions(String stream, WriteContext ctx, HeartbeatOptions options) {
-        HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
-        if (options.isSendHeartBeatToReader()) {
-            op.setWriteControlRecord(true);
-        }
-        executeStreamOp(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
-        TruncateOp op = new TruncateOp(
-            stream,
-            DLSN.deserialize(dlsn),
-            statsLogger,
-            perStreamStatsLogger,
-            getChecksum(ctx),
-            featureChecksumDisabled,
-            accessControlManager);
-        executeStreamOp(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<WriteResponse> delete(String stream, WriteContext ctx) {
-        DeleteOp op = new DeleteOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
-        executeStreamOp(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<WriteResponse> release(String stream, WriteContext ctx) {
-        ReleaseOp op = new ReleaseOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
-        executeStreamOp(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<WriteResponse> create(String stream, WriteContext ctx) {
-        CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled);
-        return executeStreamAdminOp(op);
-    }
-
-    //
-    // Ownership RPC
-    //
-
-    @Override
-    public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) {
-        if (streamManager.isAcquired(streamName)) {
-            // the stream is already acquired
-            return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
-        }
-
-        return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
-            @Override
-            public WriteResponse apply(String server) {
-                String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
-                return new WriteResponse(ResponseUtils.ownerToHeader(host));
-            }
-        });
-    }
-
-
-    //
-    // Admin RPCs
-    //
-
-    @Override
-    public Future<Void> setAcceptNewStream(boolean enabled) {
-        closeLock.writeLock().lock();
-        try {
-            logger.info("Set AcceptNewStream = {}", enabled);
-            if (ServerStatus.DOWN != serverStatus) {
-                if (enabled) {
-                    serverStatus = ServerStatus.WRITE_AND_ACCEPT;
-                } else {
-                    serverStatus = ServerStatus.WRITE_ONLY;
-                }
-            }
-        } finally {
-            closeLock.writeLock().unlock();
-        }
-        return Future.Void();
-    }
-
-    private Future<WriteResponse> doWrite(final String name,
-                                          ByteBuffer data,
-                                          Long checksum,
-                                          boolean isRecordSet) {
-        writePendingStat.inc();
-        receivedRecordCounter.inc();
-        WriteOp op = newWriteOp(name, data, checksum, isRecordSet);
-        executeStreamOp(op);
-        return op.result().ensure(new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                writePendingStat.dec();
-                return null;
-            }
-        });
-    }
-
-    private Long getChecksum(WriteContext ctx) {
-        return ctx.isSetCrc32() ? ctx.getCrc32() : null;
-    }
-
-    private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) {
-        try {
-            op.preExecute();
-        } catch (DLException dle) {
-            return Future.exception(dle);
-        }
-        return op.execute();
-    }
-
-    private void executeStreamOp(final StreamOp op) {
-
-        // Must attach this as early as possible--returning before this point will cause us to
-        // lose the status code.
-        op.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() {
-            @Override
-            public void onSuccess(ResponseHeader header) {
-                if (header.getLocation() != null || header.getCode() == StatusCode.FOUND) {
-                    redirects.inc();
-                }
-                countStatusCode(header.getCode());
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-            }
-        });
-
-        try {
-            // Apply the request limiter
-            limiter.apply(op);
-
-            // Execute per-op pre-exec code
-            op.preExecute();
-
-        } catch (TooManyStreamsException e) {
-            // Translate to StreamUnavailableException to ensure that the client will redirect
-            // to a different host. Ideally we would be able to return TooManyStreamsException,
-            // but the way exception handling works right now we can't control the handling in
-            // the client because client changes deploy very slowly.
-            op.fail(new StreamUnavailableException(e.getMessage()));
-            return;
-        } catch (Exception e) {
-            op.fail(e);
-            return;
-        }
-
-        Stream stream;
-        try {
-            stream = getLogWriter(op.streamName());
-        } catch (RegionUnavailableException rue) {
-            // redirect the requests to other region
-            op.fail(new RegionUnavailableException("Region " + serverRegionId + " is unavailable."));
-            return;
-        } catch (IOException e) {
-            op.fail(e);
-            return;
-        }
-        if (null == stream) {
-            // redirect the requests when stream is unavailable.
-            op.fail(new ServiceUnavailableException("Server " + clientId + " is closed."));
-            return;
-        }
-
-        if (op instanceof WriteOpWithPayload) {
-            WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
-            windowedBps.add(writeOp.getPayloadSize());
-            windowedRps.inc();
-        }
-
-        stream.submit(op);
-    }
-
-    void shutdown() {
-        try {
-            closeLock.writeLock().lock();
-            try {
-                if (ServerStatus.DOWN == serverStatus) {
-                    return;
-                }
-                serverStatus = ServerStatus.DOWN;
-            } finally {
-                closeLock.writeLock().unlock();
-            }
-
-            streamManager.close();
-            movingAvgFactory.close();
-            limiter.close();
-
-            Stopwatch closeStreamsStopwatch = Stopwatch.createStarted();
-
-            Future<List<Void>> closeResult = streamManager.closeStreams();
-            logger.info("Waiting for closing all streams ...");
-            try {
-                Await.result(closeResult, Duration.fromTimeUnit(5, TimeUnit.MINUTES));
-                logger.info("Closed all streams in {} millis.",
-                        closeStreamsStopwatch.elapsed(TimeUnit.MILLISECONDS));
-            } catch (InterruptedException e) {
-                logger.warn("Interrupted on waiting for closing all streams : ", e);
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e);
-            }
-
-            // shutdown the dl namespace
-            logger.info("Closing distributedlog namespace ...");
-            dlNamespace.close();
-            logger.info("Closed distributedlog namespace .");
-
-            // Stop the feature provider
-            if (this.featureProvider instanceof AbstractFeatureProvider) {
-                ((AbstractFeatureProvider) featureProvider).stop();
-            }
-
-            // Stop the timer.
-            timer.stop();
-            placementPolicy.close();
-
-            // clean up gauge
-            unregisterGauge();
-
-            // shutdown the executor after requesting closing streams.
-            SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS);
-        } catch (Exception ex) {
-            logger.info("Exception while shutting down distributedlog service.");
-        } finally {
-            // release the keepAliveLatch in case shutdown is called from a shutdown hook.
-            keepAliveLatch.countDown();
-            logger.info("Finished shutting down distributedlog service.");
-        }
-    }
-
-    protected void startPlacementPolicy() {
-        this.placementPolicy.start(shard == 0);
-    }
-
-    @Override
-    public void notifyFatalError() {
-        triggerShutdown();
-    }
-
-    private void triggerShutdown() {
-        // release the keepAliveLatch to let the main thread shutdown the whole service.
-        logger.info("Releasing KeepAlive Latch to trigger shutdown ...");
-        keepAliveLatch.countDown();
-        logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
-    }
-
-    // Test methods.
-
-    private DynamicDistributedLogConfiguration getDynConf(String streamName) {
-        Optional<DynamicDistributedLogConfiguration> dynDlConf =
-                streamConfigProvider.getDynamicStreamConfig(streamName);
-        if (dynDlConf.isPresent()) {
-            return dynDlConf.get();
-        } else {
-            return ConfUtils.getConstDynConf(dlConfig);
-        }
-    }
-
-    /**
-     * clean up the gauge before we close to help GC.
-     */
-    private void unregisterGauge(){
-        this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
-        this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
-        this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
-        this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
-        this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
-    }
-
-    @VisibleForTesting
-    Stream newStream(String name) throws IOException {
-        return streamManager.getOrCreateStream(name, false);
-    }
-
-    @VisibleForTesting
-    WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum) {
-        return newWriteOp(stream, data, checksum, false);
-    }
-
-    @VisibleForTesting
-    RoutingService getRoutingService() {
-        return this.routingService;
-    }
-
-    @VisibleForTesting
-    DLSocketAddress getServiceAddress() throws IOException {
-        return DLSocketAddress.deserialize(clientId);
-    }
-
-    WriteOp newWriteOp(String stream,
-                       ByteBuffer data,
-                       Long checksum,
-                       boolean isRecordSet) {
-        return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
-            serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
-            accessControlManager);
-    }
-
-    @VisibleForTesting
-    Future<List<Void>> closeStreams() {
-        return streamManager.closeStreams();
-    }
-
-    @VisibleForTesting
-    public DistributedLogNamespace getDistributedLogNamespace() {
-        return dlNamespace;
-    }
-
-    @VisibleForTesting
-    StreamManager getStreamManager() {
-        return streamManager;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
deleted file mode 100644
index 17b5ab3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-/**
- * Implement handling for an unrecoverable error.
- */
-public interface FatalErrorHandler {
-
-    /**
-     * This method is invoked when an unrecoverable error has occurred
-     * and no progress can be made. It should implement a shutdown routine.
-     */
-    void notifyFatalError();
-}