You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:14 UTC

[07/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
deleted file mode 100644
index 3c53ccf..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DistributedLogClient;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A balancer balances ownerships between two targets.
- */
-public class SimpleBalancer implements Balancer {
-
-    private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
-
-    protected final String target1;
-    protected final String target2;
-    protected final DistributedLogClient targetClient1;
-    protected final DistributedLogClient targetClient2;
-    protected final MonitorServiceClient targetMonitor1;
-    protected final MonitorServiceClient targetMonitor2;
-
-    public SimpleBalancer(String name1,
-                          DistributedLogClient client1,
-                          MonitorServiceClient monitor1,
-                          String name2,
-                          DistributedLogClient client2,
-                          MonitorServiceClient monitor2) {
-        this.target1 = name1;
-        this.targetClient1 = client1;
-        this.targetMonitor1 = monitor1;
-        this.target2 = name2;
-        this.targetClient2 = client2;
-        this.targetMonitor2 = monitor2;
-    }
-
-    protected static int countNumberStreams(Map<SocketAddress, Set<String>> distribution) {
-        int count = 0;
-        for (Set<String> streams : distribution.values()) {
-            count += streams.size();
-        }
-        return count;
-    }
-
-    @Override
-    public void balance(int rebalanceWaterMark,
-                        double rebalanceTolerancePercentage,
-                        int rebalanceConcurrency,
-                        Optional<RateLimiter> rebalanceRateLimiter) {
-        // get the ownership distributions from individual targets
-        Map<SocketAddress, Set<String>> distribution1 = targetMonitor1.getStreamOwnershipDistribution();
-        Map<SocketAddress, Set<String>> distribution2 = targetMonitor2.getStreamOwnershipDistribution();
-
-        // get stream counts
-        int proxyCount1 = distribution1.size();
-        int streamCount1 = countNumberStreams(distribution1);
-        int proxyCount2 = distribution2.size();
-        int streamCount2 = countNumberStreams(distribution2);
-
-        logger.info("'{}' has {} streams by {} proxies; while '{}' has {} streams by {} proxies.",
-                    new Object[] {target1, streamCount1, proxyCount1, target2, streamCount2, proxyCount2 });
-
-        String source, target;
-        Map<SocketAddress, Set<String>> srcDistribution;
-        DistributedLogClient srcClient, targetClient;
-        MonitorServiceClient srcMonitor, targetMonitor;
-        int srcStreamCount, targetStreamCount;
-        if (streamCount1 > streamCount2) {
-            source = target1;
-            srcStreamCount = streamCount1;
-            srcClient = targetClient1;
-            srcMonitor = targetMonitor1;
-            srcDistribution = distribution1;
-
-            target = target2;
-            targetStreamCount = streamCount2;
-            targetClient = targetClient2;
-            targetMonitor = targetMonitor2;
-        } else {
-            source = target2;
-            srcStreamCount = streamCount2;
-            srcClient = targetClient2;
-            srcMonitor = targetMonitor2;
-            srcDistribution = distribution2;
-
-            target = target1;
-            targetStreamCount = streamCount1;
-            targetClient = targetClient1;
-            targetMonitor = targetMonitor1;
-        }
-
-        Map<String, Integer> loadDistribution = new HashMap<String, Integer>();
-        loadDistribution.put(source, srcStreamCount);
-        loadDistribution.put(target, targetStreamCount);
-
-        // Calculate how many streams to be rebalanced from src region to target region
-        int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance(
-            source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
-
-        if (numStreamsToRebalance <= 0) {
-            logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target);
-            return;
-        }
-
-        StreamChooser streamChooser =
-                LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance);
-        StreamMover streamMover =
-            new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
-
-        moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
-    }
-
-    @Override
-    public void balanceAll(String source,
-                           int rebalanceConcurrency,
-                           Optional<RateLimiter> rebalanceRateLimiter) {
-        String target;
-        DistributedLogClient sourceClient, targetClient;
-        MonitorServiceClient sourceMonitor, targetMonitor;
-        if (target1.equals(source)) {
-            sourceClient = targetClient1;
-            sourceMonitor = targetMonitor1;
-            target = target2;
-            targetClient = targetClient2;
-            targetMonitor = targetMonitor2;
-        } else if (target2.equals(source)) {
-            sourceClient = targetClient2;
-            sourceMonitor = targetMonitor2;
-            target = target1;
-            targetClient = targetClient1;
-            targetMonitor = targetMonitor1;
-        } else {
-            throw new IllegalArgumentException("Unknown target " + source);
-        }
-
-        // get the ownership distributions from individual targets
-        Map<SocketAddress, Set<String>> distribution = sourceMonitor.getStreamOwnershipDistribution();
-
-        if (distribution.isEmpty()) {
-            return;
-        }
-
-        StreamChooser streamChooser = new CountBasedStreamChooser(distribution);
-        StreamMover streamMover =
-            new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
-
-        moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
-    }
-
-    private void moveStreams(StreamChooser streamChooser,
-                             StreamMover streamMover,
-                             int concurrency,
-                             Optional<RateLimiter> rateLimiter) {
-        CountDownLatch doneLatch = new CountDownLatch(concurrency);
-        RegionMover regionMover = new RegionMover(streamChooser, streamMover, rateLimiter, doneLatch);
-        ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
-        try {
-            for (int i = 0; i < concurrency; i++) {
-                executorService.submit(regionMover);
-            }
-
-            try {
-                doneLatch.await();
-            } catch (InterruptedException e) {
-                logger.info("{} is interrupted. Stopping it ...", streamMover);
-                regionMover.shutdown();
-            }
-        } finally {
-            executorService.shutdown();
-        }
-
-    }
-
-    /**
-     * Move streams from <i>src</i> region to <i>target</i> region.
-     */
-    static class RegionMover implements Runnable {
-
-        final StreamChooser streamChooser;
-        final StreamMover streamMover;
-        final Optional<RateLimiter> rateLimiter;
-        final CountDownLatch doneLatch;
-        volatile boolean running = true;
-
-        RegionMover(StreamChooser streamChooser,
-                    StreamMover streamMover,
-                    Optional<RateLimiter> rateLimiter,
-                    CountDownLatch doneLatch) {
-            this.streamChooser = streamChooser;
-            this.streamMover = streamMover;
-            this.rateLimiter = rateLimiter;
-            this.doneLatch = doneLatch;
-        }
-
-        @Override
-        public void run() {
-            while (running) {
-                if (rateLimiter.isPresent()) {
-                    rateLimiter.get().acquire();
-                }
-
-                String stream = streamChooser.choose();
-                if (null == stream) {
-                    break;
-                }
-
-                streamMover.moveStream(stream);
-            }
-            doneLatch.countDown();
-        }
-
-        void shutdown() {
-            running = false;
-        }
-    }
-
-    @Override
-    public void close() {
-        // no-op
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
deleted file mode 100644
index 1d7b6f7..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-/**
- * Choose a stream to rebalance.
- */
-public interface StreamChooser {
-    /**
-     * Choose a stream to rebalance.
-     *
-     * @return stream chose
-     */
-    String choose();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
deleted file mode 100644
index 4a04530..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-/**
- * A stream mover to move streams between proxies.
- */
-public interface StreamMover {
-
-    /**
-     * Move given stream <i>streamName</i>.
-     *
-     * @param streamName
-     *          stream name to move
-     * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens.
-     * @throws Exception
-     */
-    boolean moveStream(final String streamName);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
deleted file mode 100644
index 68d934b..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DistributedLogClient;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Move Streams from <i>src</i> to <i>target</i>.
- */
-public class StreamMoverImpl implements StreamMover {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamMoverImpl.class);
-
-    final String source, target;
-    final DistributedLogClient srcClient, targetClient;
-    final MonitorServiceClient srcMonitor, targetMonitor;
-
-    public StreamMoverImpl(String source, DistributedLogClient srcClient, MonitorServiceClient srcMonitor,
-                           String target, DistributedLogClient targetClient, MonitorServiceClient targetMonitor) {
-        this.source = source;
-        this.srcClient = srcClient;
-        this.srcMonitor = srcMonitor;
-        this.target = target;
-        this.targetClient = targetClient;
-        this.targetMonitor = targetMonitor;
-    }
-
-    /**
-     * Move given stream <i>streamName</i>.
-     *
-     * @param streamName
-     *          stream name to move
-     * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens.
-     * @throws Exception
-     */
-    public boolean moveStream(final String streamName) {
-        try {
-            doMoveStream(streamName);
-            return true;
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
-    private void doMoveStream(final String streamName) throws Exception {
-        Await.result(srcClient.release(streamName).flatMap(new Function<Void, Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void result) {
-                return targetMonitor.check(streamName).addEventListener(new FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        logger.info("Moved stream {} from {} to {}.",
-                                new Object[]{streamName, source, target});
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        logger.info("Failed to move stream {} from region {} to {} : ",
-                                new Object[]{streamName, source, target, cause});
-                    }
-                });
-            }
-        }));
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("StreamMover('").append(source).append("' -> '").append(target).append("')");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
deleted file mode 100644
index 9eb8950..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Balancer to move streams around to balance the traffic.
- */
-package org.apache.distributedlog.service.balancer;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
deleted file mode 100644
index 7d72093..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
-import org.apache.distributedlog.config.ConfigurationSubscription;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.config.FileConfigurationBuilder;
-import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
-import java.io.File;
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * For all streams return the same dynamic config based on configFile.
- */
-public class DefaultStreamConfigProvider implements StreamConfigProvider {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
-
-    private final Optional<DynamicDistributedLogConfiguration> dynConf;
-    private final ConfigurationSubscription confSub;
-
-    public DefaultStreamConfigProvider(String configFilePath,
-                                       ScheduledExecutorService executorService,
-                                       int reloadPeriod,
-                                       TimeUnit reloadUnit)
-        throws ConfigurationException {
-        try {
-            File configFile = new File(configFilePath);
-            FileConfigurationBuilder properties =
-                new PropertiesConfigurationBuilder(configFile.toURI().toURL());
-            ConcurrentConstConfiguration defaultConf =
-                new ConcurrentConstConfiguration(new DistributedLogConfiguration());
-            DynamicDistributedLogConfiguration conf =
-                new DynamicDistributedLogConfiguration(defaultConf);
-            List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties);
-            confSub = new ConfigurationSubscription(
-                conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
-            this.dynConf = Optional.of(conf);
-        } catch (MalformedURLException ex) {
-            throw new ConfigurationException(ex);
-        }
-    }
-
-    @Override
-    public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) {
-        return dynConf;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
deleted file mode 100644
index 195f29d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * For all streams return an absent configuration.
- */
-public class NullStreamConfigProvider implements StreamConfigProvider {
-    static final Logger LOG = LoggerFactory.getLogger(NullStreamConfigProvider.class);
-
-    private static final Optional<DynamicDistributedLogConfiguration> nullConf =
-            Optional.<DynamicDistributedLogConfiguration>absent();
-
-    @Override
-    public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) {
-        return nullConf;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
deleted file mode 100644
index 257b4be..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.SystemConfiguration;
-
-/**
- * Configuration for DistributedLog Server.
- */
-public class ServerConfiguration extends CompositeConfiguration {
-
-    private static ClassLoader defaultLoader;
-
-    static {
-        defaultLoader = Thread.currentThread().getContextClassLoader();
-        if (null == defaultLoader) {
-            defaultLoader = DistributedLogConfiguration.class.getClassLoader();
-        }
-    }
-
-    // Server DLSN version
-    protected static final String SERVER_DLSN_VERSION = "server_dlsn_version";
-    protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
-
-    // Server Durable Write Enable/Disable Flag
-    protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
-    protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
-
-    // Server Region Id
-    protected static final String SERVER_REGION_ID = "server_region_id";
-    protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
-
-    // Server Port
-    protected static final String SERVER_PORT = "server_port";
-    protected static final int SERVER_PORT_DEFAULT = 0;
-
-    // Server Shard Id
-    protected static final String SERVER_SHARD_ID = "server_shard";
-    protected static final int SERVER_SHARD_ID_DEFAULT = -1;
-
-    // Server Threads
-    protected static final String SERVER_NUM_THREADS = "server_threads";
-    protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
-
-    // Server enable per stream stat
-    protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
-    protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
-
-    // Server graceful shutdown period (in millis)
-    protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
-    protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
-
-    // Server service timeout
-    public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms";
-    public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = "serviceTimeoutMs";
-    public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0;
-
-    // Server close writer timeout
-    public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = "server_writer_close_timeout_ms";
-    public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000;
-
-    // Server stream probation timeout
-    public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms";
-    public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs";
-    public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5;
-
-    // Server stream to partition converter
-    protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
-
-    // Use hostname as the allocator pool name
-    protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME =
-        "server_use_hostname_as_allocator_pool_name";
-    protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
-    //Configure refresh interval for calculating resource placement in seconds
-    public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S =
-        "server_resource_placement_refresh_interval_sec";
-    public static final int  SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
-
-    public ServerConfiguration() {
-        super();
-        addConfiguration(new SystemConfiguration());
-    }
-
-    /**
-     * Load configurations from {@link DistributedLogConfiguration}.
-     *
-     * @param dlConf
-     *          distributedlog configuration
-     */
-    public void loadConf(DistributedLogConfiguration dlConf) {
-        addConfiguration(dlConf);
-    }
-
-    /**
-     * Set the version to encode dlsn.
-     *
-     * @param version
-     *          dlsn version
-     * @return server configuration
-     */
-    public ServerConfiguration setDlsnVersion(byte version) {
-        setProperty(SERVER_DLSN_VERSION, version);
-        return this;
-    }
-
-    /**
-     * Get the version to encode dlsn.
-     *
-     * @see DLSN
-     * @return version to encode dlsn.
-     */
-    public byte getDlsnVersion() {
-        return getByte(SERVER_DLSN_VERSION, SERVER_DLSN_VERSION_DEFAULT);
-    }
-
-    /**
-     * Set the flag to enable/disable durable write.
-     *
-     * @param enabled
-     *          flag to enable/disable durable write
-     * @return server configuration
-     */
-    public ServerConfiguration enableDurableWrite(boolean enabled) {
-        setProperty(SERVER_DURABLE_WRITE_ENABLED, enabled);
-        return this;
-    }
-
-    /**
-     * Is durable write enabled.
-     *
-     * @return true if waiting writes to be durable. otherwise false.
-     */
-    public boolean isDurableWriteEnabled() {
-        return getBoolean(SERVER_DURABLE_WRITE_ENABLED, SERVER_DURABLE_WRITE_ENABLED_DEFAULT);
-    }
-
-    /**
-     * Set the region id used to instantiate DistributedLogNamespace.
-     *
-     * @param regionId
-     *          region id
-     * @return server configuration
-     */
-    public ServerConfiguration setRegionId(int regionId) {
-        setProperty(SERVER_REGION_ID, regionId);
-        return this;
-    }
-
-    /**
-     * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}.
-     *
-     * @return region id used to instantiate DistributedLogNamespace
-     */
-    public int getRegionId() {
-        return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT);
-    }
-
-    /**
-     * Set the server port running for this service.
-     *
-     * @param port
-     *          server port
-     * @return server configuration
-     */
-    public ServerConfiguration setServerPort(int port) {
-        setProperty(SERVER_PORT, port);
-        return this;
-    }
-
-    /**
-     * Get the server port running for this service.
-     *
-     * @return server port
-     */
-    public int getServerPort() {
-        return getInt(SERVER_PORT, SERVER_PORT_DEFAULT);
-    }
-
-    /**
-     * Set the shard id of this server.
-     *
-     * @param shardId
-     *          shard id
-     * @return shard id of this server
-     */
-    public ServerConfiguration setServerShardId(int shardId) {
-        setProperty(SERVER_SHARD_ID, shardId);
-        return this;
-    }
-
-    /**
-     * Get the shard id of this server.
-     *
-     * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
-     *
-     * @return shard id of this server.
-     */
-    public int getServerShardId() {
-        return getInt(SERVER_SHARD_ID, SERVER_SHARD_ID_DEFAULT);
-    }
-
-    /**
-     * Get the number of threads for the executor of this server.
-     *
-     * @return number of threads for the executor running in this server.
-     */
-    public int getServerThreads() {
-        return getInt(SERVER_NUM_THREADS, SERVER_NUM_THREADS_DEFAULT);
-    }
-
-    /**
-     * Set the number of threads for the executor of this server.
-     *
-     * @param numThreads
-     *          number of threads for the executor running in this server.
-     * @return server configuration
-     */
-    public ServerConfiguration setServerThreads(int numThreads) {
-        setProperty(SERVER_NUM_THREADS, numThreads);
-        return this;
-    }
-
-    /**
-     * Enable/Disable per stream stat.
-     *
-     * @param enabled
-     *          flag to enable/disable per stream stat
-     * @return server configuration
-     */
-    public ServerConfiguration setPerStreamStatEnabled(boolean enabled) {
-        setProperty(SERVER_ENABLE_PERSTREAM_STAT, enabled);
-        return this;
-    }
-
-    /**
-     * Whether the per stream stat enabled for not in this server.
-     *
-     * @return true if per stream stat enable, otherwise false.
-     */
-    public boolean isPerStreamStatEnabled() {
-        return getBoolean(SERVER_ENABLE_PERSTREAM_STAT, SERVER_ENABLE_PERSTREAM_STAT_DEFAULT);
-    }
-
-    /**
-     * Set the graceful shutdown period in millis.
-     *
-     * @param periodMs
-     *          graceful shutdown period in millis.
-     * @return server configuration
-     */
-    public ServerConfiguration setGracefulShutdownPeriodMs(long periodMs) {
-        setProperty(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, periodMs);
-        return this;
-    }
-
-    /**
-     * Get the graceful shutdown period in millis.
-     *
-     * @return graceful shutdown period in millis.
-     */
-    public long getGracefulShutdownPeriodMs() {
-        return getLong(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT);
-    }
-
-    /**
-     * Get timeout for stream op execution in proxy layer.
-     *
-     * <p>0 disables timeout.
-     *
-     * @return timeout for stream operation in proxy layer.
-     */
-    public long getServiceTimeoutMs() {
-        return getLong(SERVER_SERVICE_TIMEOUT_MS,
-                getLong(SERVER_SERVICE_TIMEOUT_MS_OLD, SERVER_SERVICE_TIMEOUT_MS_DEFAULT));
-    }
-
-    /**
-     * Set timeout for stream op execution in proxy layer.
-     *
-     * <p>0 disables timeout.
-     *
-     * @param timeoutMs
-     *          timeout for stream operation in proxy layer.
-     * @return dl configuration.
-     */
-    public ServerConfiguration setServiceTimeoutMs(long timeoutMs) {
-        setProperty(SERVER_SERVICE_TIMEOUT_MS, timeoutMs);
-        return this;
-    }
-
-    /**
-     * Get timeout for closing writer in proxy layer.
-     *
-     * <p>0 disables timeout.
-     *
-     * @return timeout for closing writer in proxy layer.
-     */
-    public long getWriterCloseTimeoutMs() {
-        return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT);
-    }
-
-    /**
-     * Set timeout for closing writer in proxy layer.
-     *
-     * <p>0 disables timeout.
-     *
-     * @param timeoutMs
-     *          timeout for closing writer in proxy layer.
-     * @return dl configuration.
-     */
-    public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) {
-        setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs);
-        return this;
-    }
-
-    /**
-     * How long should stream be kept in cache in probationary state after service timeout.
-     *
-     * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
-     *
-     * @return stream probation timeout in ms.
-     */
-    public long getStreamProbationTimeoutMs() {
-        return getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS,
-                getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD, SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT));
-    }
-
-    /**
-     * How long should stream be kept in cache in probationary state after service timeout.
-     *
-     * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
-     *
-     * @param timeoutMs probation timeout in ms.
-     * @return server configuration
-     */
-    public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) {
-        setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs);
-        return this;
-    }
-
-    /**
-     * Set the stream partition converter class.
-     *
-     * @param converterClass
-     *          stream partition converter class
-     * @return server configuration
-     */
-    public ServerConfiguration setStreamPartitionConverterClass(
-        Class<? extends StreamPartitionConverter> converterClass) {
-        setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName());
-        return this;
-    }
-
-    /**
-     * Get the stream partition converter class.
-     *
-     * @return the stream partition converter class.
-     * @throws ConfigurationException
-     */
-    public Class<? extends StreamPartitionConverter> getStreamPartitionConverterClass()
-            throws ConfigurationException {
-        return ReflectionUtils.getClass(
-                this,
-                SERVER_STREAM_PARTITION_CONVERTER_CLASS,
-                IdentityStreamPartitionConverter.class,
-                StreamPartitionConverter.class,
-                defaultLoader);
-    }
-
-     /**
-      * Set if use hostname as the allocator pool name.
-      *
-      * @param useHostname whether to use hostname as the allocator pool name.
-      * @return server configuration
-      * @see #isUseHostnameAsAllocatorPoolName()
-      */
-    public ServerConfiguration setUseHostnameAsAllocatorPoolName(boolean useHostname) {
-        setProperty(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, useHostname);
-        return this;
-    }
-
-    /**
-     * Get if use hostname as the allocator pool name.
-     *
-     * @return true if use hostname as the allocator pool name. otherwise, use
-     * {@link #getServerShardId()} as the allocator pool name.
-     * @see #getServerShardId()
-     */
-    public boolean isUseHostnameAsAllocatorPoolName() {
-        return getBoolean(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME,
-            SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT);
-    }
-
-    public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) {
-        setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs);
-        return this;
-    }
-
-    public int getResourcePlacementRefreshInterval() {
-        return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT);
-    }
-
-    /**
-     * Validate the configuration.
-     *
-     * @throws IllegalStateException when there are any invalid settings.
-     */
-    public void validate() {
-        byte dlsnVersion = getDlsnVersion();
-        checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
-                "Unknown dlsn version " + dlsnVersion);
-        checkArgument(getServerThreads() > 0,
-                "Invalid number of server threads : " + getServerThreads());
-        checkArgument(getServerShardId() >= 0,
-                "Invalid server shard id : " + getServerShardId());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
deleted file mode 100644
index 29052f9..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.config.DynamicConfigurationFactory;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import java.io.File;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide per stream configuration to DistributedLog service layer.
- */
-public class ServiceStreamConfigProvider implements StreamConfigProvider {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
-
-    private static final String CONFIG_EXTENSION = "conf";
-
-    private final File configBaseDir;
-    private final File defaultConfigFile;
-    private final StreamPartitionConverter partitionConverter;
-    private final DynamicConfigurationFactory configFactory;
-    private final DynamicDistributedLogConfiguration defaultDynConf;
-
-    public ServiceStreamConfigProvider(String configPath,
-                                       String defaultConfigPath,
-                                       StreamPartitionConverter partitionConverter,
-                                       ScheduledExecutorService executorService,
-                                       int reloadPeriod,
-                                       TimeUnit reloadUnit)
-                                       throws ConfigurationException {
-        this.configBaseDir = new File(configPath);
-        if (!configBaseDir.exists()) {
-            throw new ConfigurationException("Stream configuration base directory "
-                + configPath + " does not exist");
-        }
-        this.defaultConfigFile = new File(configPath);
-        if (!defaultConfigFile.exists()) {
-            throw new ConfigurationException("Stream configuration default config "
-                + defaultConfigPath + " does not exist");
-        }
-
-        // Construct reloading default configuration
-        this.partitionConverter = partitionConverter;
-        this.configFactory = new DynamicConfigurationFactory(executorService, reloadPeriod, reloadUnit);
-        // We know it exists from the check above.
-        this.defaultDynConf = configFactory.getDynamicConfiguration(defaultConfigPath).get();
-    }
-
-    @Override
-    public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) {
-        String configName = partitionConverter.convert(streamName).getStream();
-        String configPath = getConfigPath(configName);
-        Optional<DynamicDistributedLogConfiguration> dynConf = Optional.<DynamicDistributedLogConfiguration>absent();
-        try {
-            dynConf = configFactory.getDynamicConfiguration(configPath, defaultDynConf);
-        } catch (ConfigurationException ex) {
-            LOG.warn("Configuration exception for stream {} ({}) at {}",
-                    new Object[] {streamName, configName, configPath, ex});
-        }
-        return dynConf;
-    }
-
-    private String getConfigPath(String configName) {
-        return new File(configBaseDir, String.format("%s.%s", configName, CONFIG_EXTENSION)).getPath();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
deleted file mode 100644
index c704f70..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-
-/**
- * Expose per-stream configs to dl proxy.
- */
-public interface StreamConfigProvider {
-    /**
-     * Get dynamic per stream config overrides for a given stream.
-     *
-     * @param streamName stream name to return config for
-     * @return Optional dynamic configuration instance
-     */
-    Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java
deleted file mode 100644
index b07605e..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Server Configurations.
- */
-package org.apache.distributedlog.service.config;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java
deleted file mode 100644
index 3fcfeda..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Proxy Service.
- */
-package org.apache.distributedlog.service;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
deleted file mode 100644
index fa3dd49..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import com.twitter.util.Future;
-
-/**
- * Equal Load Appraiser.
- *
- * <p>Created for those who hold these truths to be self-evident, that all streams are created equal,
- * that they are endowed by their creator with certain unalienable loads, that among these are
- * Uno, Eins, and One.
- */
-public class EqualLoadAppraiser implements LoadAppraiser {
-    @Override
-    public Future<StreamLoad> getStreamLoad(String stream) {
-        return Future.value(new StreamLoad(stream, 1));
-    }
-
-    @Override
-    public Future<Void> refreshCache() {
-        return Future.value(null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
deleted file mode 100644
index 2e9dd6b..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * Least Load Placement Policy.
- *
- * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
- * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
- * the load of a server would be. This placement policy then distributes these streams across the
- * servers.
- */
-public class LeastLoadPlacementPolicy extends PlacementPolicy {
-
-    private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
-
-    private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
-    private Map<String, String> streamToServer = new HashMap<String, String>();
-
-    public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                                    DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
-                                    Duration refreshInterval, StatsLogger statsLogger) {
-        super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
-        statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                if (serverLoads.size() > 0) {
-                    return serverLoads.last().getLoad() - serverLoads.first().getLoad();
-                } else {
-                    return getDefaultValue();
-                }
-            }
-        });
-    }
-
-    private synchronized String getStreamOwner(String stream) {
-        return streamToServer.get(stream);
-    }
-
-    @Override
-    public Future<String> placeStream(String stream) {
-        String streamOwner = getStreamOwner(stream);
-        if (null != streamOwner) {
-            return Future.value(streamOwner);
-        }
-        Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
-        return streamLoadFuture.map(new Function<StreamLoad, String>() {
-            @Override
-            public String apply(StreamLoad streamLoad) {
-                return placeStreamSynchronized(streamLoad);
-            }
-        });
-    }
-
-    private synchronized String placeStreamSynchronized(StreamLoad streamLoad) {
-        ServerLoad serverLoad = serverLoads.pollFirst();
-        serverLoad.addStream(streamLoad);
-        serverLoads.add(serverLoad);
-        return serverLoad.getServer();
-    }
-
-    @Override
-    public void refresh() {
-        logger.info("Refreshing server loads.");
-        Future<Void> refresh = loadAppraiser.refreshCache();
-        final Set<String> servers = getServers();
-        final Set<String> allStreams = getStreams();
-        Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(
-            new Function<Void, Future<TreeSet<ServerLoad>>>() {
-            @Override
-            public Future<TreeSet<ServerLoad>> apply(Void v1) {
-                return calculate(servers, allStreams);
-            }
-        });
-        serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-                try {
-                    updateServerLoads(serverLoads);
-                } catch (PlacementStateManager.StateManagerSaveException e) {
-                    logger.error("The refreshed mapping could not be persisted and will not be used.", e);
-                }
-                return BoxedUnit.UNIT;
-            }
-        });
-    }
-
-    private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads)
-        throws PlacementStateManager.StateManagerSaveException {
-        this.placementStateManager.saveOwnership(serverLoads);
-        this.streamToServer = serverLoadsToMap(serverLoads);
-        this.serverLoads = serverLoads;
-    }
-
-    @Override
-    public synchronized void load(TreeSet<ServerLoad> serverLoads) {
-        this.serverLoads = serverLoads;
-        this.streamToServer = serverLoadsToMap(serverLoads);
-    }
-
-    public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
-        logger.info("Calculating server loads");
-        final long startTime = System.currentTimeMillis();
-        ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
-
-        for (String stream : streams) {
-            Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
-            futures.add(streamLoad);
-        }
-
-        return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
-            @Override
-            public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
-        /* Sort streamLoads so largest streams are placed first for better balance */
-                TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
-                for (StreamLoad streamLoad : streamLoads) {
-                    streamQueue.add(streamLoad);
-                }
-
-                TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
-                for (String server : servers) {
-                    ServerLoad serverLoad = new ServerLoad(server);
-                    if (!streamQueue.isEmpty()) {
-                        serverLoad.addStream(streamQueue.pollFirst());
-                    }
-                    serverLoads.add(serverLoad);
-                }
-
-                while (!streamQueue.isEmpty()) {
-                    ServerLoad serverLoad = serverLoads.pollFirst();
-                    serverLoad.addStream(streamQueue.pollFirst());
-                    serverLoads.add(serverLoad);
-                }
-                return serverLoads;
-            }
-        }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-                placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
-                return BoxedUnit.UNIT;
-            }
-        }).onFailure(new Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                logger.error("Failure calculating loads", t);
-                placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
-                return BoxedUnit.UNIT;
-            }
-        });
-    }
-
-    private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
-        HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
-        for (ServerLoad serverLoad : serverLoads) {
-            for (StreamLoad streamLoad : serverLoad.getStreamLoads()) {
-                streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
-            }
-        }
-        return streamToServer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
deleted file mode 100644
index 5cd8980..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import com.twitter.util.Future;
-
-/**
- * Interface for load appraiser.
- */
-public interface LoadAppraiser {
-    /**
-     * Retrieve the stream load for a given {@code stream}.
-     *
-     * @param stream name of the stream
-     * @return the stream load of the stream.
-     */
-    Future<StreamLoad> getStreamLoad(String stream);
-
-    /**
-     * Refesch the cache.
-     * @return
-     */
-    Future<Void> refreshCache();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
deleted file mode 100644
index ac952aa..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains.
- *
- * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
- * then distributed these StreamLoads to the available servers in a manner defined by the
- * implementation creating ServerLoad objects. It then saves this assignment via the
- * PlacementStateManager.
- */
-public abstract class PlacementPolicy {
-
-    private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
-
-    protected final LoadAppraiser loadAppraiser;
-    protected final RoutingService routingService;
-    protected final DistributedLogNamespace namespace;
-    protected final PlacementStateManager placementStateManager;
-    private final Duration refreshInterval;
-    protected final OpStatsLogger placementCalcStats;
-    private Timer placementRefreshTimer;
-
-    public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                           DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
-                           Duration refreshInterval, StatsLogger statsLogger) {
-        this.loadAppraiser = loadAppraiser;
-        this.routingService = routingService;
-        this.namespace = namespace;
-        this.placementStateManager = placementStateManager;
-        this.refreshInterval = refreshInterval;
-        placementCalcStats = statsLogger.getOpStatsLogger("placement");
-    }
-
-    public Set<String> getServers() {
-        Set<SocketAddress> hosts = routingService.getHosts();
-        Set<String> servers = new HashSet<String>(hosts.size());
-        for (SocketAddress address : hosts) {
-            servers.add(DLSocketAddress.toString((InetSocketAddress) address));
-        }
-        return servers;
-    }
-
-    public Set<String> getStreams() {
-        Set<String> streams = new HashSet<String>();
-        try {
-            Iterator<String> logs = namespace.getLogs();
-            while (logs.hasNext()) {
-                streams.add(logs.next());
-            }
-        } catch (IOException e) {
-            logger.error("Could not get streams for placement policy.", e);
-        }
-        return streams;
-    }
-
-    public void start(boolean leader) {
-        logger.info("Starting placement policy");
-
-        TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
-        for (String server : getServers()) {
-            emptyServerLoads.add(new ServerLoad(server));
-        }
-        load(emptyServerLoads); //Pre-Load so streams don't NPE
-        if (leader) { //this is the leader shard
-            logger.info("Shard is leader. Scheduling timed refresh.");
-            placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
-            placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    refresh();
-                    return BoxedUnit.UNIT;
-                }
-            });
-        } else {
-            logger.info("Shard is not leader. Watching for server load changes.");
-            placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
-                @Override
-                public void callback(TreeSet<ServerLoad> serverLoads) {
-                    if (!serverLoads.isEmpty()) {
-                        load(serverLoads);
-                    }
-                }
-            });
-        }
-    }
-
-    public void close() {
-        if (placementRefreshTimer != null) {
-            placementRefreshTimer.stop();
-        }
-    }
-
-    /**
-     * Places the stream on a server according to the policy.
-     *
-     * <p>It returns a future containing the host that owns the stream upon completion
-     */
-    public abstract Future<String> placeStream(String stream);
-
-    /**
-     * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager.
-     */
-    public abstract void refresh();
-
-    /**
-     * Loads the placement mapping into the node from a TreeSet of ServerLoads.
-     */
-    public abstract void load(TreeSet<ServerLoad> serverLoads);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
deleted file mode 100644
index 0187bed..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import java.util.TreeSet;
-
-/**
- * The PlacementStateManager handles persistence of calculated resource placements.
- */
-public interface PlacementStateManager {
-
-    /**
-     * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage.
-     */
-    void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
-
-    /**
-     * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage.
-     */
-    TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
-
-    /**
-     * Watch the persistent storage for changes to the ownership mapping.
-     *
-     * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs.
-     */
-    void watch(PlacementCallback placementCallback);
-
-    /**
-     * Placement Callback.
-     *
-     * <p>The callback is triggered when server loads are updated.
-     */
-    interface PlacementCallback {
-        void callback(TreeSet<ServerLoad> serverLoads);
-    }
-
-    /**
-     * The base exception thrown when state manager encounters errors.
-     */
-    abstract class StateManagerException extends Exception {
-        public StateManagerException(String message, Exception e) {
-            super(message, e);
-        }
-    }
-
-    /**
-     * Exception thrown when failed to load the ownership mapping.
-     */
-    class StateManagerLoadException extends StateManagerException {
-        public StateManagerLoadException(Exception e) {
-            super("Load of Ownership failed", e);
-        }
-    }
-
-    /**
-     * Exception thrown when failed to save the ownership mapping.
-     */
-    class StateManagerSaveException extends StateManagerException {
-        public StateManagerSaveException(Exception e) {
-            super("Save of Ownership failed", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
deleted file mode 100644
index d65c401..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-
-/**
- * An object represents the server load.
- *
- * <p>A comparable data object containing the identifier of the server, total appraised load on the
- * server, and all streams assigned to the server by the resource placement mapping. This is
- * comparable first by load and then by server so that a sorted data structure of these will be
- * consistent across multiple calculations.
- */
-public class ServerLoad implements Comparable {
-    private static final int BUFFER_SIZE = 4096000;
-    private final String server;
-    private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
-    private long load = 0L;
-
-    public ServerLoad(String server) {
-        this.server = server;
-    }
-
-    public synchronized long addStream(StreamLoad stream) {
-        this.load += stream.getLoad();
-        streamLoads.add(stream);
-        return this.load;
-    }
-
-    public synchronized long removeStream(String stream) {
-        for (StreamLoad streamLoad : streamLoads) {
-            if (streamLoad.stream.equals(stream)) {
-                this.load -= streamLoad.getLoad();
-                streamLoads.remove(streamLoad);
-                return this.load;
-            }
-        }
-        return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
-    }
-
-    public synchronized long getLoad() {
-        return load;
-    }
-
-    public synchronized Set<StreamLoad> getStreamLoads() {
-        return streamLoads;
-    }
-
-    public synchronized String getServer() {
-        return server;
-    }
-
-    protected synchronized org.apache.distributedlog.service.placement.thrift.ServerLoad toThrift() {
-        org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
-            new org.apache.distributedlog.service.placement.thrift.ServerLoad();
-        tServerLoad.setServer(server);
-        tServerLoad.setLoad(load);
-        ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads =
-            new ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad>();
-        for (StreamLoad streamLoad : streamLoads) {
-            tStreamLoads.add(streamLoad.toThrift());
-        }
-        tServerLoad.setStreams(tStreamLoads);
-        return tServerLoad;
-    }
-
-    public byte[] serialize() throws IOException {
-        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            toThrift().write(protocol);
-            transport.flush();
-            return transport.toString(UTF_8.name()).getBytes(UTF_8);
-        } catch (TException e) {
-            throw new IOException("Failed to serialize server load : ", e);
-        } catch (UnsupportedEncodingException uee) {
-            throw new IOException("Failed to serialize server load : ", uee);
-        }
-    }
-
-    public static ServerLoad deserialize(byte[] data) throws IOException {
-        org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
-            new org.apache.distributedlog.service.placement.thrift.ServerLoad();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            tServerLoad.read(protocol);
-            ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
-            if (tServerLoad.isSetStreams()) {
-                for (org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad :
-                    tServerLoad.getStreams()) {
-                    serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
-                }
-            }
-            return serverLoad;
-        } catch (TException e) {
-            throw new IOException("Failed to deserialize server load : ", e);
-        }
-    }
-
-    @Override
-    public synchronized int compareTo(Object o) {
-        ServerLoad other = (ServerLoad) o;
-        if (load == other.getLoad()) {
-            return server.compareTo(other.getServer());
-        } else {
-            return Long.compare(load, other.getLoad());
-        }
-    }
-
-    @Override
-    public synchronized boolean equals(Object o) {
-        if (!(o instanceof ServerLoad)) {
-            return false;
-        }
-        ServerLoad other = (ServerLoad) o;
-        return server.equals(other.getServer())
-            && load == other.getLoad()
-            && streamLoads.equals(other.getStreamLoads());
-    }
-
-    @Override
-    public synchronized String toString() {
-        return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
-    }
-
-    @Override
-    public synchronized int hashCode() {
-        return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
deleted file mode 100644
index f271222..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-
-/**
- * An object represent the load of a stream.
- *
- * <p>A comparable data object containing the identifier of the stream and the appraised load produced
- * by the stream.
- */
-public class StreamLoad implements Comparable {
-    private static final int BUFFER_SIZE = 4096;
-    public final String stream;
-    private final int load;
-
-    public StreamLoad(String stream, int load) {
-        this.stream = stream;
-        this.load = load;
-    }
-
-    public int getLoad() {
-        return load;
-    }
-
-    public String getStream() {
-        return stream;
-    }
-
-    protected org.apache.distributedlog.service.placement.thrift.StreamLoad toThrift() {
-        org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
-            new org.apache.distributedlog.service.placement.thrift.StreamLoad();
-        return tStreamLoad.setStream(stream).setLoad(load);
-    }
-
-    public byte[] serialize() throws IOException {
-        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            toThrift().write(protocol);
-            transport.flush();
-            return transport.toString(UTF_8.name()).getBytes(UTF_8);
-        } catch (TException e) {
-            throw new IOException("Failed to serialize stream load : ", e);
-        } catch (UnsupportedEncodingException uee) {
-            throw new IOException("Failed to serialize stream load : ", uee);
-        }
-    }
-
-    public static StreamLoad deserialize(byte[] data) throws IOException {
-        org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
-            new org.apache.distributedlog.service.placement.thrift.StreamLoad();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            tStreamLoad.read(protocol);
-            return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
-        } catch (TException e) {
-            throw new IOException("Failed to deserialize stream load : ", e);
-        }
-    }
-
-    @Override
-    public int compareTo(Object o) {
-        StreamLoad other = (StreamLoad) o;
-        if (load == other.getLoad()) {
-            return stream.compareTo(other.getStream());
-        } else {
-            return Long.compare(load, other.getLoad());
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof StreamLoad)) {
-            return false;
-        }
-        StreamLoad other = (StreamLoad) o;
-        return stream.equals(other.getStream()) && load == other.getLoad();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(stream).append(load).build();
-    }
-}