You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:10 UTC

[03/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
deleted file mode 100644
index 58b5b2a..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.DLMTestUtil;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.client.routing.RegionsRoutingService;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-/**
- * Base test case for distributedlog servers.
- */
-public abstract class DistributedLogServerTestCase {
-
-    protected static DistributedLogConfiguration conf =
-            new DistributedLogConfiguration().setLockTimeout(10)
-                    .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
-    protected static DistributedLogConfiguration noAdHocConf =
-            new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
-                    .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
-    protected static DistributedLogCluster dlCluster;
-    protected static DistributedLogCluster noAdHocCluster;
-
-    /**
-     * A distributedlog client wrapper for testing.
-     */
-    protected static class DLClient {
-        public final LocalRoutingService routingService;
-        public DistributedLogClientBuilder dlClientBuilder;
-        public final DistributedLogClientImpl dlClient;
-
-        protected DLClient(String name,
-                           String streamNameRegex,
-                           Optional<String> serverSideRoutingFinagleName) {
-            routingService = LocalRoutingService.newBuilder().build();
-            dlClientBuilder = DistributedLogClientBuilder.newBuilder()
-                        .name(name)
-                        .clientId(ClientId$.MODULE$.apply(name))
-                        .routingService(routingService)
-                        .streamNameRegex(streamNameRegex)
-                        .handshakeWithClientInfo(true)
-                        .clientBuilder(ClientBuilder.get()
-                            .hostConnectionLimit(1)
-                            .connectionTimeout(Duration.fromSeconds(1))
-                            .requestTimeout(Duration.fromSeconds(60)));
-            if (serverSideRoutingFinagleName.isPresent()) {
-                dlClientBuilder =
-                        dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
-            }
-            dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
-        }
-
-        public void handshake() {
-            dlClient.handshake();
-        }
-
-        public void shutdown() {
-            dlClient.close();
-        }
-    }
-
-    /**
-     * A distributedlog client wrapper that talks to two regions.
-     */
-    protected static class TwoRegionDLClient {
-
-        public final LocalRoutingService localRoutingService;
-        public final LocalRoutingService remoteRoutingService;
-        public final DistributedLogClientBuilder dlClientBuilder;
-        public final DistributedLogClientImpl dlClient;
-
-        protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) {
-            localRoutingService = new LocalRoutingService();
-            remoteRoutingService = new LocalRoutingService();
-            RegionsRoutingService regionsRoutingService =
-                    RegionsRoutingService.of(new DefaultRegionResolver(regionMap),
-                            localRoutingService, remoteRoutingService);
-            dlClientBuilder = DistributedLogClientBuilder.newBuilder()
-                        .name(name)
-                        .clientId(ClientId$.MODULE$.apply(name))
-                        .routingService(regionsRoutingService)
-                        .streamNameRegex(".*")
-                        .handshakeWithClientInfo(true)
-                        .maxRedirects(2)
-                        .clientBuilder(ClientBuilder.get()
-                            .hostConnectionLimit(1)
-                            .connectionTimeout(Duration.fromSeconds(1))
-                            .requestTimeout(Duration.fromSeconds(10)));
-            dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
-        }
-
-        public void shutdown() {
-            dlClient.close();
-        }
-    }
-
-    private final boolean clientSideRouting;
-    protected DLServer dlServer;
-    protected DLClient dlClient;
-    protected DLServer noAdHocServer;
-    protected DLClient noAdHocClient;
-
-    public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception {
-        return DistributedLogCluster.newBuilder()
-            .numBookies(3)
-            .shouldStartZK(true)
-            .zkServers("127.0.0.1")
-            .shouldStartProxy(false)
-            .dlConf(conf)
-            .bkConf(DLMTestUtil.loadTestBkConf())
-            .build();
-    }
-
-    @BeforeClass
-    public static void setupCluster() throws Exception {
-        dlCluster = createCluster(conf);
-        dlCluster.start();
-    }
-
-    public void setupNoAdHocCluster() throws Exception {
-        noAdHocCluster = createCluster(noAdHocConf);
-        noAdHocCluster.start();
-        noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
-        Optional<String> serverSideRoutingFinagleName = Optional.absent();
-        if (!clientSideRouting) {
-            serverSideRoutingFinagleName =
-                    Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress()));
-        }
-        noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName);
-    }
-
-    public void tearDownNoAdHocCluster() throws Exception {
-        if (null != noAdHocClient) {
-            noAdHocClient.shutdown();
-        }
-        if (null != noAdHocServer) {
-            noAdHocServer.shutdown();
-        }
-    }
-
-    @AfterClass
-    public static void teardownCluster() throws Exception {
-        if (null != dlCluster) {
-            dlCluster.stop();
-        }
-        if (null != noAdHocCluster) {
-            noAdHocCluster.stop();
-        }
-    }
-
-    protected static URI getUri() {
-        return dlCluster.getUri();
-    }
-
-    protected DistributedLogServerTestCase(boolean clientSideRouting) {
-        this.clientSideRouting = clientSideRouting;
-    }
-
-    @Before
-    public void setup() throws Exception {
-        dlServer = createDistributedLogServer(7001);
-        Optional<String> serverSideRoutingFinagleName = Optional.absent();
-        if (!clientSideRouting) {
-            serverSideRoutingFinagleName =
-                    Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress()));
-        }
-        dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName);
-    }
-
-    @After
-    public void teardown() throws Exception {
-        if (null != dlClient) {
-            dlClient.shutdown();
-        }
-        if (null != dlServer) {
-            dlServer.shutdown();
-        }
-    }
-
-    protected DLServer createDistributedLogServer(int port) throws Exception {
-        return new DLServer(conf, dlCluster.getUri(), port, false);
-    }
-
-    protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port)
-            throws Exception {
-        return new DLServer(conf, dlCluster.getUri(), port, false);
-    }
-
-    protected DLClient createDistributedLogClient(String clientName,
-                                                  Optional<String> serverSideRoutingFinagleName)
-            throws Exception {
-        return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName);
-    }
-
-    protected DLClient createDistributedLogClient(String clientName,
-                                                  String streamNameRegex,
-                                                  Optional<String> serverSideRoutingFinagleName)
-            throws Exception {
-        return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName);
-    }
-
-    protected TwoRegionDLClient createTwoRegionDLClient(String clientName,
-                                                        Map<SocketAddress, String> regionMap)
-            throws Exception {
-        return new TwoRegionDLClient(clientName, regionMap);
-    }
-
-    protected static void checkStreams(int numExpectedStreams, DLServer dlServer) {
-        StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager();
-        assertEquals(numExpectedStreams, streamManager.numCached());
-        assertEquals(numExpectedStreams, streamManager.numAcquired());
-    }
-
-    protected static void checkStreams(Set<String> streams, DLServer dlServer) {
-        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
-        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
-        Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet();
-
-        assertEquals(streams.size(), cachedStreams.size());
-        assertEquals(streams.size(), acquiredStreams.size());
-        assertTrue(Sets.difference(streams, cachedStreams).isEmpty());
-        assertTrue(Sets.difference(streams, acquiredStreams).isEmpty());
-    }
-
-    protected static void checkStream(String name, DLClient dlClient, DLServer dlServer,
-                                      int expectedNumProxiesInClient, int expectedClientCacheSize,
-                                      int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) {
-        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
-        assertEquals(expectedNumProxiesInClient, distribution.size());
-
-        if (expectedNumProxiesInClient > 0) {
-            Map.Entry<SocketAddress, Set<String>> localEntry =
-                    distribution.entrySet().iterator().next();
-            assertEquals(dlServer.getAddress(), localEntry.getKey());
-            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
-            assertEquals(existedInClient, localEntry.getValue().contains(name));
-        }
-
-        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
-        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
-        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
-
-        assertEquals(expectedServerCacheSize, cachedStreams.size());
-        assertEquals(existedInServer, cachedStreams.contains(name));
-        assertEquals(expectedServerCacheSize, acquiredStreams.size());
-        assertEquals(existedInServer, acquiredStreams.contains(name));
-    }
-
-    protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) {
-        return dlClient.dlClient.getStreamOwnershipDistribution();
-    }
-
-    protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) {
-        Set<String> allStreams = new HashSet<String>();
-        for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
-            allStreams.addAll(entry.getValue());
-        }
-        return allStreams;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
deleted file mode 100644
index 29a3617..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
+++ /dev/null
@@ -1,720 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.AsyncLogReader;
-import org.apache.distributedlog.DLMTestUtil;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogReader;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.impl.acl.ZKAccessControl;
-import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import org.apache.distributedlog.thrift.AccessControlEntry;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link DistributedLogServer}.
- */
-public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
-
-    @Rule
-    public TestName testName = new TestName();
-
-    protected TestDistributedLogServerBase(boolean clientSideRouting) {
-        super(clientSideRouting);
-    }
-
-    /**
-     * {@link https://issues.apache.org/jira/browse/DL-27}.
-     */
-    @DistributedLogAnnotations.FlakyTest
-    @Ignore
-    @Test(timeout = 60000)
-    public void testBasicWrite() throws Exception {
-        String name = "dlserver-basic-write";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        for (long i = 1; i <= 10; i++) {
-            logger.debug("Write entry {} to stream {}.", i, name);
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
-        }
-
-        HeartbeatOptions hbOptions = new HeartbeatOptions();
-        hbOptions.setSendHeartBeatToReader(true);
-        // make sure the first log segment of each stream created
-        FutureUtils.result(dlClient.dlClient.heartbeat(name));
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(1);
-        int numRead = 0;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            ++numRead;
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead, i);
-            r = reader.readNext(false);
-        }
-        assertEquals(10, numRead);
-        reader.close();
-        dlm.close();
-    }
-
-    /**
-     * Sanity check to make sure both checksum flag values work.
-     */
-    @Test(timeout = 60000)
-    public void testChecksumFlag() throws Exception {
-        String name = "testChecksumFlag";
-        LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
-        routingService.addHost(name, dlServer.getAddress());
-        DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
-            .name(name)
-            .clientId(ClientId$.MODULE$.apply("test"))
-            .routingService(routingService)
-            .handshakeWithClientInfo(true)
-            .clientBuilder(ClientBuilder.get()
-                .hostConnectionLimit(1)
-                .connectionTimeout(Duration.fromSeconds(1))
-                .requestTimeout(Duration.fromSeconds(60)))
-            .checksum(false);
-        DistributedLogClient dlClient = dlClientBuilder.build();
-        Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
-        dlClient.close();
-
-        dlClient = dlClientBuilder.checksum(true).build();
-        Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
-        dlClient.close();
-    }
-
-    private void runSimpleBulkWriteTest(int writeCount) throws Exception {
-        String name = String.format("dlserver-bulk-write-%d", writeCount);
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-
-        logger.debug("Write {} entries to stream {}.", writeCount, name);
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        assertEquals(futures.size(), writeCount);
-        for (Future<DLSN> future : futures) {
-            // No throw == pass.
-            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-        }
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(1);
-        int numRead = 0;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
-            ++numRead;
-            r = reader.readNext(false);
-        }
-        assertEquals(writeCount, numRead);
-        reader.close();
-        dlm.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWrite() throws Exception {
-        runSimpleBulkWriteTest(100);
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteSingleWrite() throws Exception {
-        runSimpleBulkWriteTest(1);
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteEmptyList() throws Exception {
-        String name = String.format("dlserver-bulk-write-%d", 0);
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-
-        assertEquals(0, futures.size());
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteNullArg() throws Exception {
-
-        String name = String.format("dlserver-bulk-write-%s", "null");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
-        writes.add(null);
-
-        try {
-            dlClient.dlClient.writeBulk(name, writes);
-            fail("should not have succeeded");
-        } catch (NullPointerException npe) {
-            // expected
-            logger.info("Expected to catch NullPointException.");
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteEmptyBuffer() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "empty");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
-        writes.add(ByteBuffer.wrap(("").getBytes()));
-        writes.add(ByteBuffer.wrap(("").getBytes()));
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        assertEquals(2, futures.size());
-        for (Future<DLSN> future : futures) {
-            // No throw == pass
-            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-        }
-    }
-
-    void failDueToWrongException(Exception ex) {
-        logger.info("testBulkWritePartialFailure: ", ex);
-        fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
-    }
-
-    int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
-        int failed = 0;
-        for (int i = start; i < finish; i++) {
-            Future<DLSN> future = futures.get(i);
-            try {
-                Await.result(future, Duration.fromSeconds(10));
-                fail("future should have failed!");
-            } catch (DLException cre) {
-                ++failed;
-            } catch (Exception ex) {
-                failDueToWrongException(ex);
-            }
-        }
-        return failed;
-    }
-
-    void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
-        try {
-            Await.result(future, Duration.fromSeconds(10));
-            fail("should have failed");
-        } catch (DLException dle) {
-            assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode());
-        } catch (Exception ex) {
-            failDueToWrongException(ex);
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWritePartialFailure() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "partial-failure");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        final int writeCount = 100;
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-        // Too big, will cause partial failure.
-        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
-        writes.add(buf);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-
-        // Count succeeded.
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        int succeeded = 0;
-        for (int i = 0; i < writeCount; i++) {
-            Future<DLSN> future = futures.get(i);
-            try {
-                Await.result(future, Duration.fromSeconds(10));
-                ++succeeded;
-            } catch (Exception ex) {
-                failDueToWrongException(ex);
-            }
-        }
-
-        validateFailedAsLogRecordTooLong(futures.get(writeCount));
-        FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
-        assertEquals(writeCount, succeeded);
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "first-write-failed");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        final int writeCount = 100;
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
-        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
-        writes.add(buf);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        validateFailedAsLogRecordTooLong(futures.get(0));
-        FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteTotalFailureLostLock() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "lost-lock");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        final int writeCount = 8;
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
-        ByteBuffer buf = ByteBuffer.allocate(8);
-        writes.add(buf);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-        // Warm it up with a write.
-        Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));
-
-        // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
-        DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
-        try {
-            FailpointUtils.setFailpoint(
-                FailpointUtils.FailPointName.FP_WriteInternalLostLock,
-                FailpointUtils.FailPointActions.FailPointAction_Default
-            );
-            Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
-            assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
-        } finally {
-            FailpointUtils.removeFailpoint(
-                FailpointUtils.FailPointName.FP_WriteInternalLostLock
-            );
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testHeartbeat() throws Exception {
-        String name = "dlserver-heartbeat";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        for (long i = 1; i <= 10; i++) {
-            logger.debug("Send heartbeat {} to stream {}.", i, name);
-            dlClient.dlClient.check(name).get();
-        }
-
-        logger.debug("Write entry one to stream {}.", name);
-        dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();
-
-        Thread.sleep(1000);
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
-        int numRead = 0;
-        // eid=0 => control records
-        // other 9 heartbeats will not trigger writing any control records.
-        // eid=1 => user entry
-        long startEntryId = 1;
-        LogRecordWithDLSN r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
-            assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
-            ++numRead;
-            ++startEntryId;
-            r = reader.readNext(false);
-        }
-        assertEquals(1, numRead);
-    }
-
-    @Test(timeout = 60000)
-    public void testFenceWrite() throws Exception {
-        String name = "dlserver-fence-write";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        for (long i = 1; i <= 10; i++) {
-            logger.debug("Write entry {} to stream {}.", i, name);
-            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
-        }
-
-        Thread.sleep(1000);
-
-        logger.info("Fencing stream {}.", name);
-        DLMTestUtil.fenceStream(conf, getUri(), name);
-        logger.info("Fenced stream {}.", name);
-
-        for (long i = 11; i <= 20; i++) {
-            logger.debug("Write entry {} to stream {}.", i, name);
-            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
-        }
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(1);
-        int numRead = 0;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
-            ++numRead;
-            r = reader.readNext(false);
-        }
-        assertEquals(20, numRead);
-        reader.close();
-        dlm.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testDeleteStream() throws Exception {
-        String name = "dlserver-delete-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        long txid = 101;
-        for (long i = 1; i <= 10; i++) {
-            long curTxId = txid++;
-            logger.debug("Write entry {} to stream {}.", curTxId, name);
-            dlClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-        }
-
-        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-
-        dlClient.dlClient.delete(name).get();
-
-        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-
-        Thread.sleep(1000);
-
-        DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
-        AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
-        try {
-            FutureUtils.result(reader101.readNext());
-            fail("Should fail with LogNotFoundException since the stream is deleted");
-        } catch (LogNotFoundException lnfe) {
-            // expected
-        }
-        FutureUtils.result(reader101.asyncClose());
-        dlm101.close();
-
-        txid = 201;
-        for (long i = 1; i <= 10; i++) {
-            long curTxId = txid++;
-            logger.debug("Write entry {} to stream {}.", curTxId, name);
-            DLSN dlsn = dlClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-        }
-        Thread.sleep(1000);
-
-        DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader201 = dlm201.getInputStream(1);
-        int numRead = 0;
-        int curTxId = 201;
-        LogRecord r = reader201.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(curTxId++, i);
-            ++numRead;
-            r = reader201.readNext(false);
-        }
-        assertEquals(10, numRead);
-        reader201.close();
-        dlm201.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateStream() throws Exception {
-        try {
-            setupNoAdHocCluster();
-            final String name = "dlserver-create-stream";
-
-            noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
-            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-
-            long txid = 101;
-            for (long i = 1; i <= 10; i++) {
-                long curTxId = txid++;
-                logger.debug("Write entry {} to stream {}.", curTxId, name);
-                noAdHocClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-            }
-
-            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-        } finally {
-            tearDownNoAdHocCluster();
-        }
-    }
-
-    /**
-     * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
-     */
-    @Test(timeout = 60000)
-    public void testCreateStreamTwice() throws Exception {
-        try {
-            setupNoAdHocCluster();
-            final String name = "dlserver-create-stream-twice";
-
-            noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
-            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-
-            long txid = 101;
-            for (long i = 1; i <= 10; i++) {
-                long curTxId = txid++;
-                logger.debug("Write entry {} to stream {}.", curTxId, name);
-                noAdHocClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-            }
-
-            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-
-            // create again
-            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-        } finally {
-            tearDownNoAdHocCluster();
-        }
-    }
-
-
-
-    @Test(timeout = 60000)
-    public void testTruncateStream() throws Exception {
-        String name = "dlserver-truncate-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        long txid = 1;
-        Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
-        for (int s = 1; s <= 2; s++) {
-            for (long i = 1; i <= 10; i++) {
-                long curTxId = txid++;
-                logger.debug("Write entry {} to stream {}.", curTxId, name);
-                DLSN dlsn = dlClient.dlClient.write(name,
-                        ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-                txid2DLSN.put(curTxId, dlsn);
-            }
-            if (s == 1) {
-                dlClient.dlClient.release(name).get();
-            }
-        }
-
-        DLSN dlsnToDelete = txid2DLSN.get(11L);
-        dlClient.dlClient.truncate(name, dlsnToDelete).get();
-
-        DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = readDLM.getInputStream(1);
-        int numRead = 0;
-        int curTxId = 11;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(curTxId++, i);
-            ++numRead;
-            r = reader.readNext(false);
-        }
-        assertEquals(10, numRead);
-        reader.close();
-        readDLM.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testRequestDenied() throws Exception {
-        String name = "request-denied";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        AccessControlEntry ace = new AccessControlEntry();
-        ace.setDenyWrite(true);
-        ZooKeeperClient zkc = TestZooKeeperClientBuilder
-                .newBuilder()
-                .uri(getUri())
-                .connectionTimeoutMs(60000)
-                .sessionTimeoutMs(60000)
-                .build();
-        DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
-        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
-        String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
-        ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
-        accessControl.create(zkc);
-
-        AccessControlManager acm = dlNamespace.createAccessControlManager();
-        while (acm.allowWrite(name)) {
-            Thread.sleep(100);
-        }
-
-        try {
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-            fail("Should fail with request denied exception");
-        } catch (DLException dle) {
-            assertEquals(StatusCode.REQUEST_DENIED, dle.getCode());
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testNoneStreamNameRegex() throws Exception {
-        String streamNamePrefix = "none-stream-name-regex-";
-        int numStreams = 5;
-        Set<String> streams = new HashSet<String>();
-
-        for (int i = 0; i < numStreams; i++) {
-            streams.add(streamNamePrefix + i);
-        }
-        testStreamNameRegex(streams, ".*", streams);
-    }
-
-    @Test(timeout = 60000)
-    public void testStreamNameRegex() throws Exception {
-        String streamNamePrefix = "stream-name-regex-";
-        int numStreams = 5;
-        Set<String> streams = new HashSet<String>();
-        Set<String> expectedStreams = new HashSet<String>();
-        String streamNameRegex = streamNamePrefix + "1";
-
-        for (int i = 0; i < numStreams; i++) {
-            streams.add(streamNamePrefix + i);
-        }
-        expectedStreams.add(streamNamePrefix + "1");
-
-        testStreamNameRegex(streams, streamNameRegex, expectedStreams);
-    }
-
-    private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
-                                     Set<String> expectedStreams)
-            throws Exception {
-        for (String streamName : streams) {
-            dlClient.routingService.addHost(streamName, dlServer.getAddress());
-            Await.result(dlClient.dlClient.write(streamName,
-                    ByteBuffer.wrap(streamName.getBytes(UTF_8))));
-        }
-
-        DLClient client = createDistributedLogClient(
-                "test-stream-name-regex",
-                streamNameRegex,
-                Optional.<String>absent());
-        try {
-            client.routingService.addHost("unknown", dlServer.getAddress());
-            client.handshake();
-            Map<SocketAddress, Set<String>> distribution =
-                    client.dlClient.getStreamOwnershipDistribution();
-            assertEquals(1, distribution.size());
-            Set<String> cachedStreams = distribution.values().iterator().next();
-            assertNotNull(cachedStreams);
-            assertEquals(expectedStreams.size(), cachedStreams.size());
-
-            for (String streamName : cachedStreams) {
-                assertTrue(expectedStreams.contains(streamName));
-            }
-        } finally {
-            client.shutdown();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testReleaseStream() throws Exception {
-        String name = "dlserver-release-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-
-        // release the stream
-        Await.result(dlClient.dlClient.release(name));
-        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-    }
-
-    protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
-                             String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
-        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
-        assertEquals(expectedNumProxiesInClient, distribution.size());
-
-        if (expectedNumProxiesInClient > 0) {
-            Map.Entry<SocketAddress, Set<String>> localEntry =
-                    distribution.entrySet().iterator().next();
-            assertEquals(owner, localEntry.getKey());
-            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
-            assertEquals(existedInClient, localEntry.getValue().contains(name));
-        }
-
-
-        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
-        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
-        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
-
-        assertEquals(expectedServerCacheSize, cachedStreams.size());
-        assertEquals(existedInServer, cachedStreams.contains(name));
-        assertEquals(expectedServerCacheSize, acquiredStreams.size());
-        assertEquals(existedInServer, acquiredStreams.contains(name));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
deleted file mode 100644
index c7ae960..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.fail;
-
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import org.junit.Test;
-
-/**
- * Test the server with client side routing.
- */
-public class TestDistributedLogServerClientRouting extends TestDistributedLogServerBase {
-
-    public TestDistributedLogServerClientRouting() {
-        super(true);
-    }
-
-    @Test(timeout = 60000)
-    public void testAcceptNewStream() throws Exception {
-        String name = "dlserver-accept-new-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-        dlClient.routingService.setAllowRetrySameHost(false);
-
-        Await.result(dlClient.dlClient.setAcceptNewStream(false));
-
-        try {
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-            fail("Should fail because the proxy couldn't accept new stream");
-        } catch (NoBrokersAvailableException nbae) {
-            // expected
-        }
-        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-
-        Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true));
-        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
deleted file mode 100644
index 12416a3..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-/**
- * Test the server with client side routing.
- */
-public class TestDistributedLogServerServerRouting extends TestDistributedLogServerBase {
-
-    public TestDistributedLogServerServerRouting() {
-        super(false);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
deleted file mode 100644
index e5d75c2..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
+++ /dev/null
@@ -1,833 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.TestDistributedLogBase;
-import org.apache.distributedlog.acl.DefaultAccessControlManager;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.service.config.NullStreamConfigProvider;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
-import org.apache.distributedlog.service.stream.Stream;
-import org.apache.distributedlog.service.stream.StreamImpl;
-import org.apache.distributedlog.service.stream.StreamImpl.StreamStatus;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import org.apache.distributedlog.service.stream.WriteOp;
-import org.apache.distributedlog.service.streamset.DelimiterStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for DistributedLog Service.
- */
-public class TestDistributedLogService extends TestDistributedLogBase {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
-
-    @Rule
-    public TestName testName = new TestName();
-
-    private ServerConfiguration serverConf;
-    private DistributedLogConfiguration dlConf;
-    private URI uri;
-    private final CountDownLatch latch = new CountDownLatch(1);
-    private DistributedLogServiceImpl service;
-
-    @Before
-    @Override
-    public void setup() throws Exception {
-        super.setup();
-        dlConf = new DistributedLogConfiguration();
-        dlConf.addConfiguration(conf);
-        dlConf.setLockTimeout(0)
-                .setOutputBufferSize(0)
-                .setPeriodicFlushFrequencyMilliSeconds(10)
-                .setSchedulerShutdownTimeoutMs(100);
-        serverConf = newLocalServerConf();
-        uri = createDLMURI("/" + testName.getMethodName());
-        ensureURICreated(uri);
-        service = createService(serverConf, dlConf, latch);
-    }
-
-    @After
-    @Override
-    public void teardown() throws Exception {
-        if (null != service) {
-            service.shutdown();
-        }
-        super.teardown();
-    }
-
-    private DistributedLogConfiguration newLocalConf() {
-        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
-        confLocal.addConfiguration(dlConf);
-        return confLocal;
-    }
-
-    private ServerConfiguration newLocalServerConf() {
-        ServerConfiguration serverConf = new ServerConfiguration();
-        serverConf.loadConf(dlConf);
-        serverConf.setServerThreads(1);
-        return serverConf;
-    }
-
-    private DistributedLogServiceImpl createService(
-            ServerConfiguration serverConf,
-            DistributedLogConfiguration dlConf) throws Exception {
-        return createService(serverConf, dlConf, new CountDownLatch(1));
-    }
-
-    private DistributedLogServiceImpl createService(
-            ServerConfiguration serverConf,
-            DistributedLogConfiguration dlConf,
-            CountDownLatch latch) throws Exception {
-        // Build the stream partition converter
-        StreamPartitionConverter converter;
-        try {
-            converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
-        } catch (ConfigurationException e) {
-            logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
-                    IdentityStreamPartitionConverter.class.getName());
-            converter = new IdentityStreamPartitionConverter();
-        }
-        return new DistributedLogServiceImpl(
-            serverConf,
-            dlConf,
-            ConfUtils.getConstDynConf(dlConf),
-            new NullStreamConfigProvider(),
-            uri,
-            converter,
-            new LocalRoutingService(),
-            NullStatsLogger.INSTANCE,
-            NullStatsLogger.INSTANCE,
-            latch,
-            new EqualLoadAppraiser());
-    }
-
-    private StreamImpl createUnstartedStream(DistributedLogServiceImpl service,
-                                             String name) throws Exception {
-        StreamImpl stream = (StreamImpl) service.newStream(name);
-        stream.initialize();
-        return stream;
-    }
-
-    private ByteBuffer createRecord(long txid) {
-        return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8));
-    }
-
-    private WriteOp createWriteOp(DistributedLogServiceImpl service,
-                                  String streamName,
-                                  long txid) {
-        ByteBuffer data = createRecord(txid);
-        return service.newWriteOp(streamName, data, null);
-    }
-
-    @Test(timeout = 60000)
-    public void testAcquireStreams() throws Exception {
-        String streamName = testName.getMethodName();
-        StreamImpl s0 = createUnstartedStream(service, streamName);
-        ServerConfiguration serverConf1 = new ServerConfiguration();
-        serverConf1.addConfiguration(serverConf);
-        serverConf1.setServerPort(9999);
-        DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
-        StreamImpl s1 = createUnstartedStream(service1, streamName);
-
-        // create write ops
-        WriteOp op0 = createWriteOp(service, streamName, 0L);
-        s0.submit(op0);
-
-        WriteOp op1 = createWriteOp(service1, streamName, 1L);
-        s1.submit(op1);
-
-        // check pending size
-        assertEquals("Write Op 0 should be pending in service 0",
-                1, s0.numPendingOps());
-        assertEquals("Write Op 1 should be pending in service 1",
-                1, s1.numPendingOps());
-
-        // start acquiring s0
-        s0.start();
-        WriteResponse wr0 = Await.result(op0.result());
-        assertEquals("Op 0 should succeed",
-                StatusCode.SUCCESS, wr0.getHeader().getCode());
-        assertEquals("Service 0 should acquire stream",
-                StreamStatus.INITIALIZED, s0.getStatus());
-        assertNotNull(s0.getManager());
-        assertNotNull(s0.getWriter());
-        assertNull(s0.getLastException());
-
-        // start acquiring s1
-        s1.start();
-        WriteResponse wr1 = Await.result(op1.result());
-        assertEquals("Op 1 should fail",
-                StatusCode.FOUND, wr1.getHeader().getCode());
-        // the stream will be set to ERROR and then be closed.
-        assertTrue("Service 1 should be in unavailable state",
-                StreamStatus.isUnavailable(s1.getStatus()));
-        assertNotNull(s1.getManager());
-        assertNull(s1.getWriter());
-        assertNotNull(s1.getLastException());
-        assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException);
-
-        service1.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception {
-        String streamName = testName.getMethodName() + "_0000";
-
-        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
-        confLocal.addConfiguration(dlConf);
-        confLocal.setMaxCachedPartitionsPerProxy(1);
-
-        ServerConfiguration serverConfLocal = new ServerConfiguration();
-        serverConfLocal.addConfiguration(serverConf);
-        serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
-
-        DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
-        Stream stream = serviceLocal.getLogWriter(streamName);
-
-        // stream is cached
-        assertNotNull(stream);
-        assertEquals(1, serviceLocal.getStreamManager().numCached());
-
-        // create write ops
-        WriteOp op0 = createWriteOp(service, streamName, 0L);
-        stream.submit(op0);
-        WriteResponse wr0 = Await.result(op0.result());
-        assertEquals("Op 0 should succeed",
-                StatusCode.SUCCESS, wr0.getHeader().getCode());
-        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-
-        // should fail to acquire another partition
-        try {
-            serviceLocal.getLogWriter(testName.getMethodName() + "_0001");
-            fail("Should fail to acquire new streams");
-        } catch (StreamUnavailableException sue) {
-            // expected
-        }
-        assertEquals(1, serviceLocal.getStreamManager().numCached());
-        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-
-        // should be able to acquire partitions from other streams
-        String anotherStreamName = testName.getMethodName() + "-another_0001";
-        Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
-        assertNotNull(anotherStream);
-        assertEquals(2, serviceLocal.getStreamManager().numCached());
-
-        // create write ops
-        WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
-        anotherStream.submit(op1);
-        WriteResponse wr1 = Await.result(op1.result());
-        assertEquals("Op 1 should succeed",
-                StatusCode.SUCCESS, wr1.getHeader().getCode());
-        assertEquals(2, serviceLocal.getStreamManager().numAcquired());
-    }
-
-    @Test(timeout = 60000)
-    public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception {
-        String streamName = testName.getMethodName() + "_0000";
-
-        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
-        confLocal.addConfiguration(dlConf);
-        confLocal.setMaxCachedPartitionsPerProxy(-1);
-        confLocal.setMaxAcquiredPartitionsPerProxy(1);
-
-        ServerConfiguration serverConfLocal = new ServerConfiguration();
-        serverConfLocal.addConfiguration(serverConf);
-        serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
-
-        DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
-        Stream stream = serviceLocal.getLogWriter(streamName);
-
-        // stream is cached
-        assertNotNull(stream);
-        assertEquals(1, serviceLocal.getStreamManager().numCached());
-
-        // create write ops
-        WriteOp op0 = createWriteOp(service, streamName, 0L);
-        stream.submit(op0);
-        WriteResponse wr0 = Await.result(op0.result());
-        assertEquals("Op 0 should succeed",
-                StatusCode.SUCCESS, wr0.getHeader().getCode());
-        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-
-        // should be able to cache partitions from same stream
-        String anotherStreamName = testName.getMethodName() + "_0001";
-        Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
-        assertNotNull(anotherStream);
-        assertEquals(2, serviceLocal.getStreamManager().numCached());
-
-        // create write ops
-        WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
-        anotherStream.submit(op1);
-        WriteResponse wr1 = Await.result(op1.result());
-        assertEquals("Op 1 should fail",
-                StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode());
-        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-    }
-
-    @Test(timeout = 60000)
-    public void testCloseShouldErrorOutPendingOps() throws Exception {
-        String streamName = testName.getMethodName();
-        StreamImpl s = createUnstartedStream(service, streamName);
-
-        int numWrites = 10;
-        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
-        for (int i = 0; i < numWrites; i++) {
-            WriteOp op = createWriteOp(service, streamName, i);
-            s.submit(op);
-            futureList.add(op.result());
-        }
-        assertEquals(numWrites, s.numPendingOps());
-        Await.result(s.requestClose("close stream"));
-        assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED,
-                StreamStatus.CLOSED, s.getStatus());
-        for (int i = 0; i < numWrites; i++) {
-            Future<WriteResponse> future = futureList.get(i);
-            WriteResponse wr = Await.result(future);
-            assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
-                    StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testCloseTwice() throws Exception {
-        String streamName = testName.getMethodName();
-        StreamImpl s = createUnstartedStream(service, streamName);
-
-        int numWrites = 10;
-        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
-        for (int i = 0; i < numWrites; i++) {
-            WriteOp op = createWriteOp(service, streamName, i);
-            s.submit(op);
-            futureList.add(op.result());
-        }
-        assertEquals(numWrites, s.numPendingOps());
-
-        Future<Void> closeFuture0 = s.requestClose("close 0");
-        assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
-                StreamStatus.CLOSING == s.getStatus()
-                    || StreamStatus.CLOSED == s.getStatus());
-        Future<Void> closeFuture1 = s.requestClose("close 1");
-        assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
-                StreamStatus.CLOSING == s.getStatus()
-                    || StreamStatus.CLOSED == s.getStatus());
-
-        Await.result(closeFuture0);
-        assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
-                StreamStatus.CLOSED, s.getStatus());
-        Await.result(closeFuture1);
-        assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
-                StreamStatus.CLOSED, s.getStatus());
-
-        for (int i = 0; i < numWrites; i++) {
-            Future<WriteResponse> future = futureList.get(i);
-            WriteResponse wr = Await.result(future);
-            assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
-                    StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testFailRequestsDuringClosing() throws Exception {
-        String streamName = testName.getMethodName();
-        StreamImpl s = createUnstartedStream(service, streamName);
-
-        Future<Void> closeFuture = s.requestClose("close");
-        assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
-                StreamStatus.CLOSING == s.getStatus()
-                    || StreamStatus.CLOSED == s.getStatus());
-        WriteOp op1 = createWriteOp(service, streamName, 0L);
-        s.submit(op1);
-        WriteResponse response1 = Await.result(op1.result());
-        assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing",
-                StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode());
-
-        Await.result(closeFuture);
-        assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
-                StreamStatus.CLOSED, s.getStatus());
-        WriteOp op2 = createWriteOp(service, streamName, 1L);
-        s.submit(op2);
-        WriteResponse response2 = Await.result(op2.result());
-        assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed",
-                StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode());
-    }
-
-    @Test(timeout = 60000)
-    public void testServiceTimeout() throws Exception {
-        DistributedLogConfiguration confLocal = newLocalConf();
-        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
-                .setImmediateFlushEnabled(false)
-                .setPeriodicFlushFrequencyMilliSeconds(0);
-        ServerConfiguration serverConfLocal = newLocalServerConf();
-        serverConfLocal.addConfiguration(serverConf);
-        serverConfLocal.setServiceTimeoutMs(200)
-                .setStreamProbationTimeoutMs(100);
-        String streamName = testName.getMethodName();
-        // create a new service with 200ms timeout
-        DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal);
-        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
-        int numWrites = 10;
-        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
-        for (int i = 0; i < numWrites; i++) {
-            futureList.add(localService.write(streamName, createRecord(i)));
-        }
-
-        assertTrue("Stream " + streamName + " should be cached",
-                streamManager.getCachedStreams().containsKey(streamName));
-
-        StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName);
-        // the stream should be set CLOSING
-        while (StreamStatus.CLOSING != s.getStatus()
-            && StreamStatus.CLOSED != s.getStatus()) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        assertNotNull("Writer should be initialized", s.getWriter());
-        assertNull("No exception should be thrown", s.getLastException());
-        Future<Void> closeFuture = s.getCloseFuture();
-        Await.result(closeFuture);
-        for (int i = 0; i < numWrites; i++) {
-            assertTrue("Write should not fail before closing",
-                    futureList.get(i).isDefined());
-            WriteResponse response = Await.result(futureList.get(i));
-            assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
-                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
-                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
-                        || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
-        }
-
-        while (streamManager.getCachedStreams().containsKey(streamName)) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        assertFalse("Stream should be removed from cache",
-                streamManager.getCachedStreams().containsKey(streamName));
-        assertFalse("Stream should be removed from acquired cache",
-                streamManager.getAcquiredStreams().containsKey(streamName));
-
-        localService.shutdown();
-    }
-
-    private DistributedLogServiceImpl createConfiguredLocalService() throws Exception {
-        DistributedLogConfiguration confLocal = newLocalConf();
-        confLocal.setOutputBufferSize(0)
-                .setImmediateFlushEnabled(true)
-                .setPeriodicFlushFrequencyMilliSeconds(0);
-        return createService(serverConf, confLocal);
-    }
-
-    private ByteBuffer getTestDataBuffer() {
-        return ByteBuffer.wrap("test-data".getBytes());
-    }
-
-    @Test(timeout = 60000)
-    public void testNonDurableWrite() throws Exception {
-        DistributedLogConfiguration confLocal = newLocalConf();
-        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
-                .setImmediateFlushEnabled(false)
-                .setPeriodicFlushFrequencyMilliSeconds(0)
-                .setDurableWriteEnabled(false);
-        ServerConfiguration serverConfLocal = new ServerConfiguration();
-        serverConfLocal.addConfiguration(serverConf);
-        serverConfLocal.enableDurableWrite(false);
-        serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE)
-                .setStreamProbationTimeoutMs(Integer.MAX_VALUE);
-        String streamName = testName.getMethodName();
-        DistributedLogServiceImpl localService =
-                createService(serverConfLocal, confLocal);
-        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
-        int numWrites = 10;
-        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>();
-        for (int i = 0; i < numWrites; i++) {
-            futureList.add(localService.write(streamName, createRecord(i)));
-        }
-        assertTrue("Stream " + streamName + " should be cached",
-                streamManager.getCachedStreams().containsKey(streamName));
-        List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList));
-        for (WriteResponse wr : resultList) {
-            assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn()));
-        }
-
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testWriteOpNoChecksum() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext();
-        Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testTruncateOpNoChecksum() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext();
-        Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testStreamOpNoChecksum() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext();
-        HeartbeatOptions option = new HeartbeatOptions();
-        option.setSendHeartBeatToReader(true);
-
-        // hearbeat to acquire the stream and then release the stream
-        Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-        result = localService.release("test", ctx);
-        resp = Await.result(result);
-        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-
-        // heartbeat to acquire the stream and then delete the stream
-        result = localService.heartbeatWithOptions("test", ctx, option);
-        resp = Await.result(result);
-        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-        result = localService.delete("test", ctx);
-        resp = Await.result(result);
-        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-
-        // shutdown the local service
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testWriteOpChecksumBadChecksum() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext().setCrc32(999);
-        Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testWriteOpChecksumBadStream() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext().setCrc32(
-            ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array()));
-        Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testWriteOpChecksumBadData() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        ByteBuffer buffer = getTestDataBuffer();
-        WriteContext ctx = new WriteContext().setCrc32(
-            ProtocolUtils.writeOpCRC32("test", buffer.array()));
-
-        // Overwrite 1 byte to corrupt data.
-        buffer.put(1, (byte) 0xab);
-        Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testStreamOpChecksumBadChecksum() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext().setCrc32(999);
-        Future<WriteResponse> result = localService.heartbeat("test", ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        result = localService.release("test", ctx);
-        resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        result = localService.delete("test", ctx);
-        resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testTruncateOpChecksumBadChecksum() throws Exception {
-        DistributedLogServiceImpl localService = createConfiguredLocalService();
-        WriteContext ctx = new WriteContext().setCrc32(999);
-        Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
-        WriteResponse resp = Await.result(result);
-        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
-        localService.shutdown();
-    }
-
-    private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) {
-        return new WriteOp(name,
-            ByteBuffer.wrap("test".getBytes()),
-            new NullStatsLogger(),
-            new NullStatsLogger(),
-            new IdentityStreamPartitionConverter(),
-            new ServerConfiguration(),
-            (byte) 0,
-            checksum,
-            false,
-            disabledFeature,
-            DefaultAccessControlManager.INSTANCE);
-    }
-
-    @Test(timeout = 60000)
-    public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception {
-        String streamName = testName.getMethodName();
-
-        SettableFeature disabledFeature = new SettableFeature("", 0);
-
-        WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L);
-        WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L);
-
-        try {
-            writeOp0.preExecute();
-            fail("should have thrown");
-        } catch (Exception ex) {
-        }
-
-        disabledFeature.set(1);
-        writeOp1.preExecute();
-    }
-
-    @Test(timeout = 60000)
-    public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception {
-        String streamName = testName.getMethodName();
-
-        SettableFeature disabledFeature = new SettableFeature("", 1);
-        WriteOp writeOp0 = getWriteOp(
-            streamName,
-            disabledFeature,
-            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
-        WriteOp writeOp1 = getWriteOp(
-            streamName,
-            disabledFeature,
-            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
-
-        writeOp0.preExecute();
-        disabledFeature.set(0);
-        writeOp1.preExecute();
-    }
-
-    @Test(timeout = 60000)
-    public void testCloseStreamsShouldFlush() throws Exception {
-        DistributedLogConfiguration confLocal = newLocalConf();
-        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
-                .setImmediateFlushEnabled(false)
-                .setPeriodicFlushFrequencyMilliSeconds(0);
-
-        String streamNamePrefix = testName.getMethodName();
-        DistributedLogServiceImpl localService = createService(serverConf, confLocal);
-        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
-        int numStreams = 10;
-        int numWrites = 10;
-        List<Future<WriteResponse>> futureList =
-                Lists.newArrayListWithExpectedSize(numStreams * numWrites);
-        for (int i = 0; i < numStreams; i++) {
-            String streamName = streamNamePrefix + "-" + i;
-            HeartbeatOptions hbOptions = new HeartbeatOptions();
-            hbOptions.setSendHeartBeatToReader(true);
-            // make sure the first log segment of each stream created
-            FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
-            for (int j = 0; j < numWrites; j++) {
-                futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
-            }
-        }
-
-        assertEquals("There should be " + numStreams + " streams in cache",
-                numStreams, streamManager.getCachedStreams().size());
-        while (streamManager.getAcquiredStreams().size() < numStreams) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        Future<List<Void>> closeResult = localService.closeStreams();
-        List<Void> closedStreams = Await.result(closeResult);
-        assertEquals("There should be " + numStreams + " streams closed",
-                numStreams, closedStreams.size());
-        // all writes should be flushed
-        for (Future<WriteResponse> future : futureList) {
-            WriteResponse response = Await.result(future);
-            assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(),
-                    StatusCode.SUCCESS == response.getHeader().getCode()
-                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
-                        || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
-        }
-        assertTrue("There should be no streams in the cache",
-                streamManager.getCachedStreams().isEmpty());
-        assertTrue("There should be no streams in the acquired cache",
-                streamManager.getAcquiredStreams().isEmpty());
-
-        localService.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testCloseStreamsShouldAbort() throws Exception {
-        DistributedLogConfiguration confLocal = newLocalConf();
-        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
-                .setImmediateFlushEnabled(false)
-                .setPeriodicFlushFrequencyMilliSeconds(0);
-
-        String streamNamePrefix = testName.getMethodName();
-        DistributedLogServiceImpl localService = createService(serverConf, confLocal);
-        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
-        int numStreams = 10;
-        int numWrites = 10;
-        List<Future<WriteResponse>> futureList =
-                Lists.newArrayListWithExpectedSize(numStreams * numWrites);
-        for (int i = 0; i < numStreams; i++) {
-            String streamName = streamNamePrefix + "-" + i;
-            HeartbeatOptions hbOptions = new HeartbeatOptions();
-            hbOptions.setSendHeartBeatToReader(true);
-            // make sure the first log segment of each stream created
-            FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
-            for (int j = 0; j < numWrites; j++) {
-                futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
-            }
-        }
-
-        assertEquals("There should be " + numStreams + " streams in cache",
-                numStreams, streamManager.getCachedStreams().size());
-        while (streamManager.getAcquiredStreams().size() < numStreams) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        for (Stream s : streamManager.getAcquiredStreams().values()) {
-            StreamImpl stream = (StreamImpl) s;
-            stream.setStatus(StreamStatus.ERROR);
-        }
-
-        Future<List<Void>> closeResult = localService.closeStreams();
-        List<Void> closedStreams = Await.result(closeResult);
-        assertEquals("There should be " + numStreams + " streams closed",
-                numStreams, closedStreams.size());
-        // all writes should be flushed
-        for (Future<WriteResponse> future : futureList) {
-            WriteResponse response = Await.result(future);
-            assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : "
-                    + response.getHeader().getCode(),
-                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
-                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
-                        || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
-        }
-        // acquired streams should all been removed after we close them
-        assertTrue("There should be no streams in the acquired cache",
-            streamManager.getAcquiredStreams().isEmpty());
-        localService.shutdown();
-        // cached streams wouldn't be removed immediately after streams are closed
-        // but they should be removed after we shutdown the service
-        assertTrue("There should be no streams in the cache after shutting down the service",
-            streamManager.getCachedStreams().isEmpty());
-    }
-
-    @Test(timeout = 60000)
-    public void testShutdown() throws Exception {
-        service.shutdown();
-        StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager();
-        WriteResponse response =
-                Await.result(service.write(testName.getMethodName(), createRecord(0L)));
-        assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE,
-                StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode());
-        assertTrue("There should be no streams created after shutdown",
-                streamManager.getCachedStreams().isEmpty());
-        assertTrue("There should be no streams acquired after shutdown",
-                streamManager.getAcquiredStreams().isEmpty());
-    }
-
-    @Test(timeout = 60000)
-    public void testGetOwner() throws Exception {
-        ((LocalRoutingService) service.getRoutingService())
-                .addHost("stream-0", service.getServiceAddress().getSocketAddress())
-                .setAllowRetrySameHost(false);
-
-        service.startPlacementPolicy();
-
-        WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
-        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
-        assertEquals(service.getServiceAddress().toString(),
-                response.getHeader().getLocation());
-
-        // service cache "stream-2"
-        StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false);
-        // create write ops to stream-2 to make service acquire the stream
-        WriteOp op = createWriteOp(service, "stream-2", 0L);
-        stream.submit(op);
-        stream.start();
-        WriteResponse wr = Await.result(op.result());
-        assertEquals("Op should succeed",
-                StatusCode.SUCCESS, wr.getHeader().getCode());
-        assertEquals("Service should acquire stream",
-                StreamStatus.INITIALIZED, stream.getStatus());
-        assertNotNull(stream.getManager());
-        assertNotNull(stream.getWriter());
-        assertNull(stream.getLastException());
-
-        // the stream is acquired
-        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
-        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
-        assertEquals(service.getServiceAddress().toString(),
-                response.getHeader().getLocation());
-    }
-
-}