You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:09 UTC

[02/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
deleted file mode 100644
index d0a2f88..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.feature.DefaultFeatureProvider;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link org.apache.distributedlog.exceptions.RegionUnavailableException}.
- */
-public class TestRegionUnavailable extends DistributedLogServerTestCase {
-
-    /**
-     * A feature provider for testing.
-     */
-    public static class TestFeatureProvider extends DefaultFeatureProvider {
-
-        public TestFeatureProvider(String rootScope,
-                                   DistributedLogConfiguration conf,
-                                   StatsLogger statsLogger) {
-            super(rootScope, conf, statsLogger);
-        }
-
-        @Override
-        protected Feature makeFeature(String featureName) {
-            if (featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase())) {
-                return new SettableFeature(featureName, 10000);
-            }
-            return super.makeFeature(featureName);
-        }
-
-        @Override
-        protected FeatureProvider makeProvider(String fullScopeName) {
-            return super.makeProvider(fullScopeName);
-        }
-    }
-
-    private final int numServersPerDC = 3;
-    private final List<DLServer> localCluster;
-    private final List<DLServer> remoteCluster;
-    private TwoRegionDLClient client;
-
-    public TestRegionUnavailable() {
-        super(true);
-        this.localCluster = new ArrayList<DLServer>();
-        this.remoteCluster = new ArrayList<DLServer>();
-    }
-
-    @Before
-    @Override
-    public void setup() throws Exception {
-        DistributedLogConfiguration localConf = new DistributedLogConfiguration();
-        localConf.addConfiguration(conf);
-        localConf.setFeatureProviderClass(TestFeatureProvider.class);
-        DistributedLogConfiguration remoteConf = new DistributedLogConfiguration();
-        remoteConf.addConfiguration(conf);
-        super.setup();
-        int localPort = 9010;
-        int remotePort = 9020;
-        for (int i = 0; i < numServersPerDC; i++) {
-            localCluster.add(createDistributedLogServer(localConf, localPort + i));
-            remoteCluster.add(createDistributedLogServer(remoteConf, remotePort + i));
-        }
-        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
-        for (DLServer server : localCluster) {
-            regionMap.put(server.getAddress(), "local");
-        }
-        for (DLServer server : remoteCluster) {
-            regionMap.put(server.getAddress(), "remote");
-        }
-        client = createTwoRegionDLClient("two_regions_client", regionMap);
-
-    }
-
-    private void registerStream(String streamName) {
-        for (DLServer server : localCluster) {
-            client.localRoutingService.addHost(streamName, server.getAddress());
-        }
-        client.remoteRoutingService.addHost(streamName, remoteCluster.get(0).getAddress());
-    }
-
-    @After
-    @Override
-    public void teardown() throws Exception {
-        super.teardown();
-        if (null != client) {
-            client.shutdown();
-        }
-        for (DLServer server : localCluster) {
-            server.shutdown();
-        }
-        for (DLServer server : remoteCluster) {
-            server.shutdown();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testRegionUnavailable() throws Exception {
-        String name = "dlserver-region-unavailable";
-        registerStream(name);
-
-        for (long i = 1; i <= 10; i++) {
-            client.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
-        }
-
-        // check local region
-        for (DLServer server : localCluster) {
-            checkStreams(0, server);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
deleted file mode 100644
index c8b8bdf..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.junit.Assert.assertEquals;
-
-import com.twitter.finagle.Service;
-import com.twitter.finagle.service.ConstantService;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StatsFilter}.
- */
-public class TestStatsFilter {
-
-    class RuntimeExService<Req, Rep> extends Service<Req, Rep> {
-        public Future<Rep> apply(Req request) {
-            throw new RuntimeException("test");
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testServiceSuccess() throws Exception {
-        StatsLogger stats = new NullStatsLogger();
-        StatsFilter<String, String> filter = new StatsFilter<String, String>(stats);
-        Future<String> result = filter.apply("", new ConstantService<String, String>(Future.value("result")));
-        assertEquals("result", Await.result(result));
-    }
-
-    @Test(timeout = 60000)
-    public void testServiceFailure() throws Exception {
-        StatsLogger stats = new NullStatsLogger();
-        StatsFilter<String, String> filter = new StatsFilter<String, String>(stats);
-        try {
-            filter.apply("", new RuntimeExService<String, String>());
-        } catch (RuntimeException ex) {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
deleted file mode 100644
index 21bebb5..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Test;
-
-/**
- * Test Case for {@link BalancerUtils}.
- */
-public class TestBalancerUtils {
-
-    @Test(timeout = 60000)
-    public void testCalculateNumStreamsToRebalance() {
-        String myNode = "mynode";
-
-        // empty load distribution
-        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
-                myNode, new HashMap<String, Integer>(), 0, 10));
-        // my node doesn't exist in load distribution
-        Map<String, Integer> loadDistribution = new HashMap<String, Integer>();
-        loadDistribution.put("node1", 10);
-        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
-                myNode, loadDistribution, 0, 10));
-        // my node doesn't reach rebalance water mark
-        loadDistribution.clear();
-        loadDistribution.put("node1", 1);
-        loadDistribution.put(myNode, 100);
-        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
-                myNode, loadDistribution, 200, 10));
-        // my node is below average in the cluster.
-        loadDistribution.clear();
-        loadDistribution.put(myNode, 1);
-        loadDistribution.put("node1", 99);
-        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
-                myNode, loadDistribution, 0, 10));
-        // my node is above average in the cluster
-        assertEquals(49, BalancerUtils.calculateNumStreamsToRebalance(
-                "node1", loadDistribution, 0, 10));
-        // my node is at the tolerance range
-        loadDistribution.clear();
-        loadDistribution.put(myNode, 55);
-        loadDistribution.put("node1", 45);
-        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
-                myNode, loadDistribution, 0, 10));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
deleted file mode 100644
index fb3fb6e..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.DistributedLogServerTestCase;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link ClusterBalancer}.
- */
-public class TestClusterBalancer extends DistributedLogServerTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class);
-
-    private final int numServers = 5;
-    private final List<DLServer> cluster;
-    private DLClient client;
-
-    public TestClusterBalancer() {
-        super(true);
-        this.cluster = new ArrayList<DLServer>();
-    }
-
-    @Before
-    @Override
-    public void setup() throws Exception {
-        super.setup();
-        int initPort = 9001;
-        for (int i = 0; i < numServers; i++) {
-            cluster.add(createDistributedLogServer(initPort + i));
-        }
-        client = createDistributedLogClient("cluster_client", Optional.<String>absent());
-    }
-
-    @After
-    @Override
-    public void teardown() throws Exception {
-        super.teardown();
-        if (null != client) {
-            client.shutdown();
-        }
-        for (DLServer server: cluster) {
-            server.shutdown();
-        }
-    }
-
-    private void initStreams(String namePrefix) {
-        logger.info("Init streams with prefix {}", namePrefix);
-        // Stream Distribution: 5, 4, 3, 2, 1
-        initStreams(namePrefix, 5, 1, 0);
-        initStreams(namePrefix, 4, 6, 1);
-        initStreams(namePrefix, 3, 10, 2);
-        initStreams(namePrefix, 2, 13, 3);
-        initStreams(namePrefix, 1, 15, 4);
-    }
-
-    private void initStreams(String namePrefix, int numStreams, int streamId, int proxyId) {
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + (streamId++);
-            client.routingService.addHost(name, cluster.get(proxyId).getAddress());
-        }
-    }
-
-    private void writeStreams(String namePrefix) throws Exception {
-        logger.info("Write streams with prefix {}", namePrefix);
-        writeStreams(namePrefix, 5, 1);
-        writeStreams(namePrefix, 4, 6);
-        writeStreams(namePrefix, 3, 10);
-        writeStreams(namePrefix, 2, 13);
-        writeStreams(namePrefix, 1, 15);
-    }
-
-    private void writeStreams(String namePrefix, int numStreams, int streamId) throws Exception {
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + (streamId++);
-            try {
-                Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8))));
-            } catch (Exception e) {
-                logger.error("Error writing stream {} : ", name, e);
-                throw e;
-            }
-        }
-    }
-
-    private void validateStreams(String namePrefix) throws Exception {
-        logger.info("Validate streams with prefix {}", namePrefix);
-        validateStreams(namePrefix, 5, 1, 0);
-        validateStreams(namePrefix, 4, 6, 1);
-        validateStreams(namePrefix, 3, 10, 2);
-        validateStreams(namePrefix, 2, 13, 3);
-        validateStreams(namePrefix, 1, 15, 4);
-    }
-
-    private void validateStreams(String namePrefix, int numStreams, int streamId, int proxyIdx) {
-        Set<String> expectedStreams = new HashSet<String>();
-        for (int i = 0; i < numStreams; i++) {
-            expectedStreams.add(namePrefix + (streamId++));
-        }
-        checkStreams(expectedStreams, cluster.get(proxyIdx));
-    }
-
-    @Ignore
-    @Test(timeout = 60000)
-    public void testBalanceAll() throws Exception {
-        String namePrefix = "clusterbalancer-balance-all-";
-
-        initStreams(namePrefix);
-        writeStreams(namePrefix);
-        validateStreams(namePrefix);
-
-        Optional<RateLimiter> rateLimiter = Optional.absent();
-
-        Balancer balancer = new ClusterBalancer(client.dlClientBuilder,
-                Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient));
-        logger.info("Rebalancing from 'unknown' target");
-        try {
-            balancer.balanceAll("unknown", 10, rateLimiter);
-            fail("Should fail on balanceAll from 'unknown' target.");
-        } catch (IllegalArgumentException iae) {
-            // expected
-        }
-        validateStreams(namePrefix);
-
-        logger.info("Rebalancing from 'unexisted' host");
-        String addr = DLSocketAddress.toString(DLSocketAddress.getSocketAddress(9999));
-        balancer.balanceAll(addr, 10, rateLimiter);
-        validateStreams(namePrefix);
-
-        addr = DLSocketAddress.toString(cluster.get(0).getAddress());
-        logger.info("Rebalancing from host {}.", addr);
-        balancer.balanceAll(addr, 10, rateLimiter);
-        checkStreams(0, cluster.get(0));
-        checkStreams(4, cluster.get(1));
-        checkStreams(3, cluster.get(2));
-        checkStreams(4, cluster.get(3));
-        checkStreams(4, cluster.get(4));
-
-        addr = DLSocketAddress.toString(cluster.get(2).getAddress());
-        logger.info("Rebalancing from host {}.", addr);
-        balancer.balanceAll(addr, 10, rateLimiter);
-        checkStreams(3, cluster.get(0));
-        checkStreams(4, cluster.get(1));
-        checkStreams(0, cluster.get(2));
-        checkStreams(4, cluster.get(3));
-        checkStreams(4, cluster.get(4));
-
-        logger.info("Rebalancing the cluster");
-        balancer.balance(0, 0.0f, 10, rateLimiter);
-        for (int i = 0; i < 5; i++) {
-            checkStreams(3, cluster.get(i));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
deleted file mode 100644
index 6734083..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Sets;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Test;
-
-/**
- * Test Case for {@link CountBasedStreamChooser}.
- */
-public class TestCountBasedStreamChooser {
-
-    @Test(timeout = 60000)
-    public void testEmptyStreamDistribution() {
-        try {
-            new CountBasedStreamChooser(new HashMap<SocketAddress, Set<String>>());
-            fail("Should fail constructing stream chooser if the stream distribution is empty");
-        } catch (IllegalArgumentException iae) {
-            // expected
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testMultipleHostsWithEmptyStreams() {
-        for (int i = 1; i <= 3; i++) {
-            Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-            int port = 1000;
-            for (int j = 0; j < i; j++) {
-                SocketAddress address = new InetSocketAddress("127.0.0.1", port + j);
-                streamDistribution.put(address, new HashSet<String>());
-            }
-
-            CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
-            for (int k = 0; k < i + 1; k++) {
-                assertNull(chooser.choose());
-            }
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testSingleHostWithStreams() {
-        for (int i = 0; i < 3; i++) {
-            Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-
-            Set<String> streams = new HashSet<String>();
-            for (int j = 0; j < 3; j++) {
-                streams.add("SingleHostStream-" + j);
-            }
-
-            int port = 1000;
-            SocketAddress address = new InetSocketAddress("127.0.0.1", port);
-            streamDistribution.put(address, streams);
-
-            for (int k = 1; k <= i; k++) {
-                address = new InetSocketAddress("127.0.0.1", port + k);
-                streamDistribution.put(address, new HashSet<String>());
-            }
-
-            Set<String> choosenStreams = new HashSet<String>();
-
-            CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
-            for (int l = 0; l < 3 + i + 1; l++) {
-                String s = chooser.choose();
-                if (null != s) {
-                    choosenStreams.add(s);
-                }
-            }
-
-            assertEquals(streams.size(), choosenStreams.size());
-            assertTrue(Sets.difference(streams, choosenStreams).immutableCopy().isEmpty());
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testHostsHaveSameNumberStreams() {
-        Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-        Set<String> allStreams = new HashSet<String>();
-
-        int numHosts = 3;
-        int numStreamsPerHost = 3;
-
-        int port = 1000;
-        for (int i = 1; i <= numHosts; i++) {
-            SocketAddress address = new InetSocketAddress("127.0.0.1", port + i);
-            Set<String> streams = new HashSet<String>();
-
-            for (int j = 1; j <= numStreamsPerHost; j++) {
-                String streamName = "HostsHaveSameNumberStreams-" + i + "-" + j;
-                streams.add(streamName);
-                allStreams.add(streamName);
-            }
-
-            streamDistribution.put(address, streams);
-        }
-
-        Set<String> streamsChoosen = new HashSet<String>();
-        CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
-        for (int i = 1; i <= numStreamsPerHost; i++) {
-            for (int j = 1; j <= numHosts; j++) {
-                String s = chooser.choose();
-                assertNotNull(s);
-                streamsChoosen.add(s);
-            }
-            for (int j = 0; j < numHosts; j++) {
-                assertEquals(numStreamsPerHost - i, chooser.streamsDistribution.get(j).getRight().size());
-            }
-        }
-        assertNull(chooser.choose());
-        assertEquals(numHosts * numStreamsPerHost, streamsChoosen.size());
-        assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty());
-    }
-
-    @Test(timeout = 60000)
-    public void testHostsHaveDifferentNumberStreams() {
-        Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-        Set<String> allStreams = new HashSet<String>();
-
-        int numHosts = 6;
-        int maxStreamsPerHost = 4;
-
-        int port = 1000;
-        for (int i = 0; i < numHosts; i++) {
-            int group = i / 2;
-            int numStreamsThisGroup = maxStreamsPerHost - group;
-
-            SocketAddress address = new InetSocketAddress("127.0.0.1", port + i);
-            Set<String> streams = new HashSet<String>();
-
-            for (int j = 1; j <= numStreamsThisGroup; j++) {
-                String streamName = "HostsHaveDifferentNumberStreams-" + i + "-" + j;
-                streams.add(streamName);
-                allStreams.add(streamName);
-            }
-
-            streamDistribution.put(address, streams);
-        }
-
-        Set<String> streamsChoosen = new HashSet<String>();
-        CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
-
-        for (int i = 0; i < allStreams.size(); i++) {
-            String s = chooser.choose();
-            assertNotNull(s);
-            streamsChoosen.add(s);
-        }
-        assertNull(chooser.choose());
-        assertEquals(allStreams.size(), streamsChoosen.size());
-        assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty());
-    }
-
-    @Test(timeout = 60000)
-    public void testLimitedStreamChooser() {
-        Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-
-        Set<String> streams = new HashSet<String>();
-        for (int j = 0; j < 10; j++) {
-            streams.add("SingleHostStream-" + j);
-        }
-
-        int port = 1000;
-        SocketAddress address = new InetSocketAddress("127.0.0.1", port);
-        streamDistribution.put(address, streams);
-
-        Set<String> choosenStreams = new HashSet<String>();
-
-        CountBasedStreamChooser underlying = new CountBasedStreamChooser(streamDistribution);
-        LimitedStreamChooser chooser = LimitedStreamChooser.of(underlying, 1);
-        for (int l = 0; l < 10; l++) {
-            String s = chooser.choose();
-            if (null != s) {
-                choosenStreams.add(s);
-            }
-        }
-
-        assertEquals(1, choosenStreams.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
deleted file mode 100644
index 73fa98a..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.DistributedLogServerTestCase;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link SimpleBalancer}.
- */
-public class TestSimpleBalancer extends DistributedLogServerTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class);
-
-    DLClient targetClient;
-    DLServer targetServer;
-
-    public TestSimpleBalancer() {
-        super(true);
-    }
-
-    @Before
-    @Override
-    public void setup() throws Exception {
-        super.setup();
-        targetServer = createDistributedLogServer(7003);
-        targetClient = createDistributedLogClient("target", Optional.<String>absent());
-    }
-
-    @After
-    @Override
-    public void teardown() throws Exception {
-        super.teardown();
-        if (null != targetClient) {
-            targetClient.shutdown();
-        }
-        if (null != targetServer) {
-            targetServer.shutdown();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBalanceAll() throws Exception {
-        String namePrefix = "simplebalancer-balance-all-";
-        int numStreams = 10;
-
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            // src client
-            dlClient.routingService.addHost(name, dlServer.getAddress());
-            // target client
-            targetClient.routingService.addHost(name, targetServer.getAddress());
-        }
-
-        // write to multiple streams
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
-        }
-
-        // validation
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true);
-            checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
-        }
-
-        Optional<RateLimiter> rateLimiter = Optional.absent();
-
-        Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient,
-                                               "target", targetClient.dlClient, targetClient.dlClient);
-        logger.info("Rebalancing from 'unknown' target");
-        try {
-            balancer.balanceAll("unknown", 10, rateLimiter);
-            fail("Should fail on balanceAll from 'unknown' target.");
-        } catch (IllegalArgumentException iae) {
-            // expected
-        }
-
-        // nothing to balance from 'target'
-        logger.info("Rebalancing from 'target' target");
-        balancer.balanceAll("target", 1, rateLimiter);
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true);
-            checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
-        }
-
-        // balance all streams from 'source'
-        logger.info("Rebalancing from 'source' target");
-        balancer.balanceAll("source", 10, rateLimiter);
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            checkStream(name, targetClient, targetServer, 1, numStreams, numStreams, true, true);
-            checkStream(name, dlClient, dlServer, 0, 0, 0, false, false);
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBalanceStreams() throws Exception {
-        String namePrefix = "simplebalancer-balance-streams-";
-        int numStreams = 10;
-
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            // src client
-            dlClient.routingService.addHost(name, dlServer.getAddress());
-            // target client
-            targetClient.routingService.addHost(name, targetServer.getAddress());
-        }
-
-        // write to multiple streams
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
-        }
-
-        // validation
-        for (int i = 0; i < numStreams; i++) {
-            String name = namePrefix + i;
-            checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true);
-            checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
-        }
-
-        Optional<RateLimiter> rateLimiter = Optional.absent();
-
-        Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient,
-                                               "target", targetClient.dlClient, targetClient.dlClient);
-
-        // balance all streams from 'source'
-        logger.info("Rebalancing streams between targets");
-        balancer.balance(0, 0, 10, rateLimiter);
-
-        Set<String> sourceStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(dlClient));
-        Set<String> targetStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(targetClient));
-
-        assertEquals(numStreams / 2, sourceStreams.size());
-        assertEquals(numStreams / 2, targetStreams.size());
-
-        for (String name : sourceStreams) {
-            checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, true, true);
-            checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, false, false);
-        }
-
-        for (String name : targetStreams) {
-            checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, true, true);
-            checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, false, false);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
deleted file mode 100644
index ce7b2c1..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.DistributedLogServerTestCase;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StreamMover}.
- */
-public class TestStreamMover extends DistributedLogServerTestCase {
-
-    DLClient targetClient;
-    DLServer targetServer;
-
-    public TestStreamMover() {
-        super(true);
-    }
-
-    @Before
-    @Override
-    public void setup() throws Exception {
-        super.setup();
-        targetServer = createDistributedLogServer(7003);
-        targetClient = createDistributedLogClient("target", Optional.<String>absent());
-    }
-
-    @After
-    @Override
-    public void teardown() throws Exception {
-        super.teardown();
-        if (null != targetClient) {
-            targetClient.shutdown();
-        }
-        if (null != targetServer) {
-            targetServer.shutdown();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testMoveStream() throws Exception {
-        String name = "dlserver-move-stream";
-
-        // src client
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-        // target client
-        targetClient.routingService.addHost(name, targetServer.getAddress());
-
-        // src client write a record to that stream
-        Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-        checkStream(name, dlClient, dlServer, 1, 1, 1, true, true);
-        checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
-
-        StreamMover streamMover = new StreamMoverImpl("source", dlClient.dlClient, dlClient.dlClient,
-                                                      "target", targetClient.dlClient, targetClient.dlClient);
-        assertTrue(streamMover.moveStream(name));
-        checkStream(name, dlClient, dlServer, 0, 0, 0, false, false);
-        checkStream(name, targetClient, targetServer, 1, 1, 1, true, true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
deleted file mode 100644
index 71dfa45..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-/**
- * Test Case for {@link ServerConfiguration}.
- */
-public class TestServerConfiguration {
-
-    @Test(timeout = 60000, expected = IllegalArgumentException.class)
-    public void testUnassignedShardId() {
-        new ServerConfiguration().validate();
-    }
-
-    @Test(timeout = 60000)
-    public void testAssignedShardId() {
-        ServerConfiguration conf = new ServerConfiguration();
-        conf.setServerShardId(100);
-        conf.validate();
-        assertEquals(100, conf.getServerShardId());
-    }
-
-    @Test(timeout = 60000, expected = IllegalArgumentException.class)
-    public void testInvalidServerThreads() {
-        ServerConfiguration conf = new ServerConfiguration();
-        conf.setServerShardId(100);
-        conf.setServerThreads(-1);
-        conf.validate();
-    }
-
-    @Test(timeout = 60000, expected = IllegalArgumentException.class)
-    public void testInvalidDlsnVersion() {
-        ServerConfiguration conf = new ServerConfiguration();
-        conf.setServerShardId(100);
-        conf.setDlsnVersion((byte) 9999);
-        conf.validate();
-    }
-
-    @Test(timeout = 60000)
-    public void testUseHostnameAsAllocatorPoolName() {
-        ServerConfiguration conf = new ServerConfiguration();
-        assertFalse("Should not use hostname by default", conf.isUseHostnameAsAllocatorPoolName());
-        conf.setUseHostnameAsAllocatorPoolName(true);
-        assertTrue("Should use hostname now", conf.isUseHostnameAsAllocatorPoolName());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
deleted file mode 100644
index bdbde11..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import static org.apache.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.config.PropertiesWriter;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StreamConfigProvider}.
- */
-public class TestStreamConfigProvider {
-    private static final String DEFAULT_CONFIG_DIR = "conf";
-    private final String defaultConfigPath;
-    private final ScheduledExecutorService configExecutorService;
-
-    public TestStreamConfigProvider() throws Exception {
-        this.configExecutorService = Executors.newScheduledThreadPool(1,
-                new ThreadFactoryBuilder().setNameFormat("DistributedLogService-Dyncfg-%d").build());
-        PropertiesWriter writer = new PropertiesWriter();
-        writer.save();
-        this.defaultConfigPath = writer.getFile().getPath();
-    }
-
-    StreamConfigProvider getServiceProvider(StreamPartitionConverter converter)
-            throws Exception {
-        return getServiceProvider(converter, DEFAULT_CONFIG_DIR);
-    }
-
-    StreamConfigProvider getServiceProvider(
-            StreamPartitionConverter converter,
-            String configPath,
-            String defaultPath) throws Exception {
-        return new ServiceStreamConfigProvider(
-                configPath,
-                defaultPath,
-                converter,
-                configExecutorService,
-                1,
-                TimeUnit.SECONDS);
-    }
-
-    StreamConfigProvider getServiceProvider(
-            StreamPartitionConverter converter,
-            String configPath) throws Exception {
-        return getServiceProvider(converter, configPath, defaultConfigPath);
-    }
-
-    StreamConfigProvider getDefaultProvider(String configFile) throws Exception {
-        return new DefaultStreamConfigProvider(configFile, configExecutorService, 1, TimeUnit.SECONDS);
-    }
-
-    StreamConfigProvider getNullProvider() throws Exception {
-        return new NullStreamConfigProvider();
-    }
-
-    @Test(timeout = 60000)
-    public void testServiceProviderWithConfigRouters() throws Exception {
-        getServiceProvider(new IdentityStreamPartitionConverter());
-    }
-
-    @Test(timeout = 60000)
-    public void testServiceProviderWithMissingConfig() throws Exception {
-        StreamConfigProvider provider = getServiceProvider(new IdentityStreamPartitionConverter());
-        Optional<DynamicDistributedLogConfiguration> config = provider.getDynamicStreamConfig("stream1");
-        assertTrue(config.isPresent());
-    }
-
-    @Test(timeout = 60000)
-    public void testServiceProviderWithDefaultConfigPath() throws Exception {
-        // Default config with property set.
-        PropertiesWriter writer1 = new PropertiesWriter();
-        writer1.setProperty("rpsStreamAcquireServiceLimit", "191919");
-        writer1.save();
-        String fallbackConfPath1 = writer1.getFile().getPath();
-        StreamConfigProvider provider1 = getServiceProvider(new IdentityStreamPartitionConverter(),
-                DEFAULT_CONFIG_DIR, fallbackConfPath1);
-        Optional<DynamicDistributedLogConfiguration> config1 = provider1.getDynamicStreamConfig("stream1");
-
-        // Empty default config.
-        PropertiesWriter writer2 = new PropertiesWriter();
-        writer2.save();
-        String fallbackConfPath2 = writer2.getFile().getPath();
-        StreamConfigProvider provider2 = getServiceProvider(new IdentityStreamPartitionConverter(),
-                DEFAULT_CONFIG_DIR, fallbackConfPath2);
-        Optional<DynamicDistributedLogConfiguration> config2 = provider2.getDynamicStreamConfig("stream1");
-
-        assertEquals(191919, config1.get().getRpsStreamAcquireServiceLimit());
-        assertEquals(-1, config2.get().getRpsStreamAcquireServiceLimit());
-    }
-
-    @Test(timeout = 60000)
-    public void testDefaultProvider() throws Exception {
-        PropertiesWriter writer = new PropertiesWriter();
-        writer.setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, "99");
-        writer.save();
-        StreamConfigProvider provider = getDefaultProvider(writer.getFile().getPath());
-        Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1");
-        Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2");
-        assertTrue(config1.isPresent());
-        assertTrue(config1.get() == config2.get());
-        assertEquals(99, config1.get().getRetentionPeriodHours());
-    }
-
-    @Test(timeout = 60000)
-    public void testNullProvider() throws Exception {
-        StreamConfigProvider provider = getNullProvider();
-        Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1");
-        Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2");
-        assertFalse(config1.isPresent());
-        assertTrue(config1 == config2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
deleted file mode 100644
index 5f5ecd4..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.LinkedHashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test Case for {@link LeastLoadPlacementPolicy}.
- */
-public class TestLeastLoadPlacementPolicy {
-
-    @Test(timeout = 10000)
-    public void testCalculateBalances() throws Exception {
-        int numSevers = new Random().nextInt(20) + 1;
-        int numStreams = new Random().nextInt(200) + 1;
-        RoutingService mockRoutingService = mock(RoutingService.class);
-        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
-        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
-            new EqualLoadAppraiser(),
-            mockRoutingService,
-            mockNamespace,
-            null,
-            Duration.fromSeconds(600),
-            new NullStatsLogger());
-        TreeSet<ServerLoad> serverLoads =
-            Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
-        long lowLoadPerServer = numStreams / numSevers;
-        long highLoadPerServer = lowLoadPerServer + 1;
-        for (ServerLoad serverLoad : serverLoads) {
-            long load = serverLoad.getLoad();
-            assertEquals(load, serverLoad.getStreamLoads().size());
-            assertTrue(String.format("Load %d is not between %d and %d",
-                load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
-        }
-    }
-
-    @Test(timeout = 10000)
-    public void testRefreshAndPlaceStream() throws Exception {
-        int numSevers = new Random().nextInt(20) + 1;
-        int numStreams = new Random().nextInt(200) + 1;
-        RoutingService mockRoutingService = mock(RoutingService.class);
-        when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
-        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
-        try {
-            when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
-        } catch (IOException e) {
-            fail();
-        }
-        PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
-        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
-            new EqualLoadAppraiser(),
-            mockRoutingService,
-            mockNamespace,
-            mockPlacementStateManager,
-            Duration.fromSeconds(600),
-            new NullStatsLogger());
-        leastLoadPlacementPolicy.refresh();
-
-        final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
-        verify(mockPlacementStateManager).saveOwnership(captor.capture());
-        TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) captor.getValue();
-        ServerLoad next = serverLoads.first();
-        String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
-        assertEquals(next.getServer(), serverPlacement);
-    }
-
-    @Test(timeout = 10000)
-    public void testCalculateUnequalWeight() throws Exception {
-        int numSevers = new Random().nextInt(20) + 1;
-        int numStreams = new Random().nextInt(200) + 1;
-    /* use AtomicInteger to have a final object in answer method */
-        final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
-        RoutingService mockRoutingService = mock(RoutingService.class);
-        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
-        LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
-        when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
-            @Override
-            public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
-                int load = new Random().nextInt(100000);
-                if (load > maxLoad.get()) {
-                    maxLoad.set(load);
-                }
-                return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
-            }
-        });
-        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
-            mockLoadAppraiser,
-            mockRoutingService,
-            mockNamespace,
-            null,
-            Duration.fromSeconds(600),
-            new NullStatsLogger());
-        TreeSet<ServerLoad> serverLoads =
-            Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
-        long highestLoadSeen = Long.MIN_VALUE;
-        long lowestLoadSeen = Long.MAX_VALUE;
-        for (ServerLoad serverLoad : serverLoads) {
-            long load = serverLoad.getLoad();
-            if (load < lowestLoadSeen) {
-                lowestLoadSeen = load;
-            }
-            if (load > highestLoadSeen) {
-                highestLoadSeen = load;
-            }
-        }
-        assertTrue("Unexpected placement for " + numStreams + " streams to "
-                + numSevers + " servers : highest load = " + highestLoadSeen
-                + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
-            highestLoadSeen - lowestLoadSeen <= maxLoad.get());
-    }
-
-    private Set<SocketAddress> generateSocketAddresses(int num) {
-        LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
-        for (int i = 0; i < num; i++) {
-            socketAddresses.add(new InetSocketAddress(i));
-        }
-        return socketAddresses;
-    }
-
-    private Set<String> generateStreams(int num) {
-        LinkedHashSet<String> streams = new LinkedHashSet<String>();
-        for (int i = 0; i < num; i++) {
-            streams.add("stream_" + i);
-        }
-        return streams;
-    }
-
-    private Set<String> generateServers(int num) {
-        LinkedHashSet<String> servers = new LinkedHashSet<String>();
-        for (int i = 0; i < num; i++) {
-            servers.add("server_" + i);
-        }
-        return servers;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
deleted file mode 100644
index 5bd234f..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ServerLoad}.
- */
-public class TestServerLoad {
-
-    @Test(timeout = 60000)
-    public void testSerializeDeserialize() throws IOException {
-        final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
-        for (int i = 0; i < 20; i++) {
-            serverLoad.addStream(new StreamLoad("stream-" + i, i));
-        }
-        assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
-    }
-
-    @Test(timeout = 60000)
-    public void testGetLoad() throws IOException {
-        final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
-        assertEquals(0, serverLoad.getLoad());
-        serverLoad.addStream(new StreamLoad("stream-" + 1, 3));
-        assertEquals(3, serverLoad.getLoad());
-        serverLoad.addStream(new StreamLoad("stream-" + 2, 7));
-        assertEquals(10, serverLoad.getLoad());
-        serverLoad.addStream(new StreamLoad("stream-" + 3, 1));
-        assertEquals(11, serverLoad.getLoad());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
deleted file mode 100644
index 36a6fed..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StreamLoad}.
- */
-public class TestStreamLoad {
-
-    @Test(timeout = 10000)
-    public void testSerializeDeserialize() throws IOException {
-        final String streamName = "aHellaRandomStreamName";
-        final int load = 1337;
-        final StreamLoad streamLoad = new StreamLoad(streamName, load);
-        assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
deleted file mode 100644
index 07ec5a5..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import java.io.IOException;
-import java.net.URI;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.curator.test.TestingServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ZKPlacementStateManager}.
- */
-public class TestZKPlacementStateManager {
-    private TestingServer zkTestServer;
-    private String zkServers;
-    private URI uri;
-    private ZKPlacementStateManager zkPlacementStateManager;
-
-    @Before
-    public void startZookeeper() throws Exception {
-        zkTestServer = new TestingServer(2181);
-        zkServers = "127.0.0.1:2181";
-        uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
-        zkPlacementStateManager =
-            new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
-    }
-
-    @Test(timeout = 60000)
-    public void testSaveLoad() throws Exception {
-        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-        zkPlacementStateManager.saveOwnership(ownerships);
-        SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
-        assertEquals(ownerships, loadedOwnerships);
-
-        ownerships.add(new ServerLoad("emptyServer"));
-        zkPlacementStateManager.saveOwnership(ownerships);
-        loadedOwnerships = zkPlacementStateManager.loadOwnership();
-        assertEquals(ownerships, loadedOwnerships);
-
-        ServerLoad sl1 = new ServerLoad("server1");
-        sl1.addStream(new StreamLoad("stream1", 3));
-        sl1.addStream(new StreamLoad("stream2", 4));
-        ServerLoad sl2 = new ServerLoad("server2");
-        sl2.addStream(new StreamLoad("stream3", 1));
-        ownerships.add(sl1);
-        ownerships.add(sl2);
-        zkPlacementStateManager.saveOwnership(ownerships);
-        loadedOwnerships = zkPlacementStateManager.loadOwnership();
-        assertEquals(ownerships, loadedOwnerships);
-
-        loadedOwnerships.remove(sl1);
-        zkPlacementStateManager.saveOwnership(ownerships);
-        loadedOwnerships = zkPlacementStateManager.loadOwnership();
-        assertEquals(ownerships, loadedOwnerships);
-    }
-
-    private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
-        LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
-        int expectedNumServerLoads) throws InterruptedException {
-        TreeSet<ServerLoad> notification = notificationQueue.take();
-        assertNotNull(notification);
-        while (notification.size() < expectedNumServerLoads) {
-            notification = notificationQueue.take();
-        }
-        assertEquals(expectedNumServerLoads, notification.size());
-        return notification;
-    }
-
-    @Test(timeout = 60000)
-    public void testWatchIndefinitely() throws Exception {
-        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-        ownerships.add(new ServerLoad("server1"));
-        final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
-            new LinkedBlockingQueue<TreeSet<ServerLoad>>();
-        PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
-            @Override
-            public void callback(TreeSet<ServerLoad> serverLoads) {
-                serverLoadNotifications.add(serverLoads);
-            }
-        };
-        zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
-        zkPlacementStateManager.watch(callback);
-        // cannot verify the callback here as it may call before the verify is called
-
-        zkPlacementStateManager.saveOwnership(ownerships);
-        assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
-
-        ServerLoad server2 = new ServerLoad("server2");
-        server2.addStream(new StreamLoad("hella-important-stream", 415));
-        ownerships.add(server2);
-        zkPlacementStateManager.saveOwnership(ownerships);
-        assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
-    }
-
-    @Test(timeout = 60000)
-    public void testZkFormatting() throws Exception {
-        final String server = "host/10.0.0.0:31351";
-        final String zkFormattedServer = "host--10.0.0.0:31351";
-        URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
-        ZKPlacementStateManager zkPlacementStateManager =
-            new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
-        assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
-        assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
-    }
-
-    @After
-    public void stopZookeeper() throws IOException {
-        zkTestServer.stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
deleted file mode 100644
index 56e9483..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import com.twitter.util.Await;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for StreamManager.
- */
-public class TestStreamManager {
-
-    @Rule
-    public TestName testName = new TestName();
-
-    ScheduledExecutorService mockExecutorService = mock(ScheduledExecutorService.class);
-
-    @Test(timeout = 60000)
-    public void testCollectionMethods() throws Exception {
-        Stream mockStream = mock(Stream.class);
-        when(mockStream.getStreamName()).thenReturn("stream1");
-        when(mockStream.getPartition()).thenReturn(new Partition("stream1", 0));
-        StreamFactory mockStreamFactory = mock(StreamFactory.class);
-        StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
-        StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
-        when(mockStreamFactory.create(
-                (String) any(),
-                (DynamicDistributedLogConfiguration) any(),
-                (StreamManager) any())).thenReturn(mockStream);
-        StreamManager streamManager = new StreamManagerImpl(
-                "",
-                new DistributedLogConfiguration(),
-                mockExecutorService,
-                mockStreamFactory,
-                mockPartitionConverter,
-                mockStreamConfigProvider,
-                mock(DistributedLogNamespace.class));
-
-        assertFalse(streamManager.isAcquired("stream1"));
-        assertEquals(0, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-
-        streamManager.notifyAcquired(mockStream);
-        assertTrue(streamManager.isAcquired("stream1"));
-        assertEquals(1, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-
-        streamManager.notifyReleased(mockStream);
-        assertFalse(streamManager.isAcquired("stream1"));
-        assertEquals(0, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-
-        streamManager.notifyAcquired(mockStream);
-        assertTrue(streamManager.isAcquired("stream1"));
-        assertEquals(1, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-
-        streamManager.notifyAcquired(mockStream);
-        assertTrue(streamManager.isAcquired("stream1"));
-        assertEquals(1, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-
-        streamManager.notifyReleased(mockStream);
-        assertFalse(streamManager.isAcquired("stream1"));
-        assertEquals(0, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-
-        streamManager.notifyReleased(mockStream);
-        assertFalse(streamManager.isAcquired("stream1"));
-        assertEquals(0, streamManager.numAcquired());
-        assertEquals(0, streamManager.numCached());
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateStream() throws Exception {
-        Stream mockStream = mock(Stream.class);
-        final String streamName = "stream1";
-        when(mockStream.getStreamName()).thenReturn(streamName);
-        StreamFactory mockStreamFactory = mock(StreamFactory.class);
-        StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
-        StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
-        when(mockStreamFactory.create(
-            (String) any(),
-            (DynamicDistributedLogConfiguration) any(),
-            (StreamManager) any())
-        ).thenReturn(mockStream);
-        DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class);
-        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
-
-        StreamManager streamManager = new StreamManagerImpl(
-                "",
-                new DistributedLogConfiguration(),
-                executorService,
-                mockStreamFactory,
-                mockPartitionConverter,
-                mockStreamConfigProvider,
-                dlNamespace);
-
-        assertTrue(Await.ready(streamManager.createStreamAsync(streamName)).isReturn());
-        verify(dlNamespace).createLog(streamName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
deleted file mode 100644
index a18fda1..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.DefaultAccessControlManager;
-import org.apache.distributedlog.exceptions.InternalServerException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for StreamOps.
- */
-public class TestStreamOp {
-
-    @Rule
-    public TestName testName = new TestName();
-
-    private WriteOp getWriteOp() {
-        SettableFeature disabledFeature = new SettableFeature("", 0);
-        return new WriteOp("test",
-            ByteBuffer.wrap("test".getBytes()),
-            new NullStatsLogger(),
-            new NullStatsLogger(),
-            new IdentityStreamPartitionConverter(),
-            new ServerConfiguration(),
-            (byte) 0,
-            null,
-            false,
-            disabledFeature,
-            DefaultAccessControlManager.INSTANCE);
-    }
-
-    @Test(timeout = 60000)
-    public void testResponseFailedTwice() throws Exception {
-        WriteOp writeOp = getWriteOp();
-        writeOp.fail(new InternalServerException("test1"));
-        writeOp.fail(new InternalServerException("test2"));
-
-        WriteResponse response = Await.result(writeOp.result());
-        assertEquals(StatusCode.INTERNAL_SERVER_ERROR, response.getHeader().getCode());
-        assertEquals(ResponseUtils.exceptionToHeader(new InternalServerException("test1")), response.getHeader());
-    }
-
-    @Test(timeout = 60000)
-    public void testResponseSucceededThenFailed() throws Exception {
-        AsyncLogWriter writer = mock(AsyncLogWriter.class);
-        when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3)));
-        when(writer.getStreamName()).thenReturn("test");
-        WriteOp writeOp = getWriteOp();
-        writeOp.execute(writer, new Sequencer() {
-            public long nextId() {
-                return 0;
-            }
-        }, new Object());
-        writeOp.fail(new InternalServerException("test2"));
-
-        WriteResponse response = Await.result(writeOp.result());
-        assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
-    }
-}