You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:19 UTC

[12/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
new file mode 100644
index 0000000..58b5b2a
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.client.routing.RegionsRoutingService;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Base test case for distributedlog servers.
+ */
+public abstract class DistributedLogServerTestCase {
+
+    protected static DistributedLogConfiguration conf =
+            new DistributedLogConfiguration().setLockTimeout(10)
+                    .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
+    protected static DistributedLogConfiguration noAdHocConf =
+            new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
+                    .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
+    protected static DistributedLogCluster dlCluster;
+    protected static DistributedLogCluster noAdHocCluster;
+
+    /**
+     * A distributedlog client wrapper for testing.
+     */
+    protected static class DLClient {
+        public final LocalRoutingService routingService;
+        public DistributedLogClientBuilder dlClientBuilder;
+        public final DistributedLogClientImpl dlClient;
+
+        protected DLClient(String name,
+                           String streamNameRegex,
+                           Optional<String> serverSideRoutingFinagleName) {
+            routingService = LocalRoutingService.newBuilder().build();
+            dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+                        .name(name)
+                        .clientId(ClientId$.MODULE$.apply(name))
+                        .routingService(routingService)
+                        .streamNameRegex(streamNameRegex)
+                        .handshakeWithClientInfo(true)
+                        .clientBuilder(ClientBuilder.get()
+                            .hostConnectionLimit(1)
+                            .connectionTimeout(Duration.fromSeconds(1))
+                            .requestTimeout(Duration.fromSeconds(60)));
+            if (serverSideRoutingFinagleName.isPresent()) {
+                dlClientBuilder =
+                        dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
+            }
+            dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+        }
+
+        public void handshake() {
+            dlClient.handshake();
+        }
+
+        public void shutdown() {
+            dlClient.close();
+        }
+    }
+
+    /**
+     * A distributedlog client wrapper that talks to two regions.
+     */
+    protected static class TwoRegionDLClient {
+
+        public final LocalRoutingService localRoutingService;
+        public final LocalRoutingService remoteRoutingService;
+        public final DistributedLogClientBuilder dlClientBuilder;
+        public final DistributedLogClientImpl dlClient;
+
+        protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) {
+            localRoutingService = new LocalRoutingService();
+            remoteRoutingService = new LocalRoutingService();
+            RegionsRoutingService regionsRoutingService =
+                    RegionsRoutingService.of(new DefaultRegionResolver(regionMap),
+                            localRoutingService, remoteRoutingService);
+            dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+                        .name(name)
+                        .clientId(ClientId$.MODULE$.apply(name))
+                        .routingService(regionsRoutingService)
+                        .streamNameRegex(".*")
+                        .handshakeWithClientInfo(true)
+                        .maxRedirects(2)
+                        .clientBuilder(ClientBuilder.get()
+                            .hostConnectionLimit(1)
+                            .connectionTimeout(Duration.fromSeconds(1))
+                            .requestTimeout(Duration.fromSeconds(10)));
+            dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+        }
+
+        public void shutdown() {
+            dlClient.close();
+        }
+    }
+
+    private final boolean clientSideRouting;
+    protected DLServer dlServer;
+    protected DLClient dlClient;
+    protected DLServer noAdHocServer;
+    protected DLClient noAdHocClient;
+
+    public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception {
+        return DistributedLogCluster.newBuilder()
+            .numBookies(3)
+            .shouldStartZK(true)
+            .zkServers("127.0.0.1")
+            .shouldStartProxy(false)
+            .dlConf(conf)
+            .bkConf(DLMTestUtil.loadTestBkConf())
+            .build();
+    }
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        dlCluster = createCluster(conf);
+        dlCluster.start();
+    }
+
+    public void setupNoAdHocCluster() throws Exception {
+        noAdHocCluster = createCluster(noAdHocConf);
+        noAdHocCluster.start();
+        noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
+        Optional<String> serverSideRoutingFinagleName = Optional.absent();
+        if (!clientSideRouting) {
+            serverSideRoutingFinagleName =
+                    Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress()));
+        }
+        noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName);
+    }
+
+    public void tearDownNoAdHocCluster() throws Exception {
+        if (null != noAdHocClient) {
+            noAdHocClient.shutdown();
+        }
+        if (null != noAdHocServer) {
+            noAdHocServer.shutdown();
+        }
+    }
+
+    @AfterClass
+    public static void teardownCluster() throws Exception {
+        if (null != dlCluster) {
+            dlCluster.stop();
+        }
+        if (null != noAdHocCluster) {
+            noAdHocCluster.stop();
+        }
+    }
+
+    protected static URI getUri() {
+        return dlCluster.getUri();
+    }
+
+    protected DistributedLogServerTestCase(boolean clientSideRouting) {
+        this.clientSideRouting = clientSideRouting;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        dlServer = createDistributedLogServer(7001);
+        Optional<String> serverSideRoutingFinagleName = Optional.absent();
+        if (!clientSideRouting) {
+            serverSideRoutingFinagleName =
+                    Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress()));
+        }
+        dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != dlClient) {
+            dlClient.shutdown();
+        }
+        if (null != dlServer) {
+            dlServer.shutdown();
+        }
+    }
+
+    protected DLServer createDistributedLogServer(int port) throws Exception {
+        return new DLServer(conf, dlCluster.getUri(), port, false);
+    }
+
+    protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port)
+            throws Exception {
+        return new DLServer(conf, dlCluster.getUri(), port, false);
+    }
+
+    protected DLClient createDistributedLogClient(String clientName,
+                                                  Optional<String> serverSideRoutingFinagleName)
+            throws Exception {
+        return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName);
+    }
+
+    protected DLClient createDistributedLogClient(String clientName,
+                                                  String streamNameRegex,
+                                                  Optional<String> serverSideRoutingFinagleName)
+            throws Exception {
+        return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName);
+    }
+
+    protected TwoRegionDLClient createTwoRegionDLClient(String clientName,
+                                                        Map<SocketAddress, String> regionMap)
+            throws Exception {
+        return new TwoRegionDLClient(clientName, regionMap);
+    }
+
+    protected static void checkStreams(int numExpectedStreams, DLServer dlServer) {
+        StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager();
+        assertEquals(numExpectedStreams, streamManager.numCached());
+        assertEquals(numExpectedStreams, streamManager.numAcquired());
+    }
+
+    protected static void checkStreams(Set<String> streams, DLServer dlServer) {
+        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+        Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet();
+
+        assertEquals(streams.size(), cachedStreams.size());
+        assertEquals(streams.size(), acquiredStreams.size());
+        assertTrue(Sets.difference(streams, cachedStreams).isEmpty());
+        assertTrue(Sets.difference(streams, acquiredStreams).isEmpty());
+    }
+
+    protected static void checkStream(String name, DLClient dlClient, DLServer dlServer,
+                                      int expectedNumProxiesInClient, int expectedClientCacheSize,
+                                      int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) {
+        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
+        assertEquals(expectedNumProxiesInClient, distribution.size());
+
+        if (expectedNumProxiesInClient > 0) {
+            Map.Entry<SocketAddress, Set<String>> localEntry =
+                    distribution.entrySet().iterator().next();
+            assertEquals(dlServer.getAddress(), localEntry.getKey());
+            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
+            assertEquals(existedInClient, localEntry.getValue().contains(name));
+        }
+
+        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
+
+        assertEquals(expectedServerCacheSize, cachedStreams.size());
+        assertEquals(existedInServer, cachedStreams.contains(name));
+        assertEquals(expectedServerCacheSize, acquiredStreams.size());
+        assertEquals(existedInServer, acquiredStreams.contains(name));
+    }
+
+    protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) {
+        return dlClient.dlClient.getStreamOwnershipDistribution();
+    }
+
+    protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) {
+        Set<String> allStreams = new HashSet<String>();
+        for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
+            allStreams.addAll(entry.getValue());
+        }
+        return allStreams;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
new file mode 100644
index 0000000..4a5dd01
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.TestZooKeeperClientBuilder;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.impl.acl.ZKAccessControl;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import org.apache.distributedlog.thrift.AccessControlEntry;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link DistributedLogServer}.
+ */
+public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected TestDistributedLogServerBase(boolean clientSideRouting) {
+        super(clientSideRouting);
+    }
+
+    /**
+     * {@link https://issues.apache.org/jira/browse/DL-27}.
+     */
+    @DistributedLogAnnotations.FlakyTest
+    @Ignore
+    @Test(timeout = 60000)
+    public void testBasicWrite() throws Exception {
+        String name = "dlserver-basic-write";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        for (long i = 1; i <= 10; i++) {
+            logger.debug("Write entry {} to stream {}.", i, name);
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
+        }
+
+        HeartbeatOptions hbOptions = new HeartbeatOptions();
+        hbOptions.setSendHeartBeatToReader(true);
+        // make sure the first log segment of each stream created
+        FutureUtils.result(dlClient.dlClient.heartbeat(name));
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(1);
+        int numRead = 0;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            ++numRead;
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead, i);
+            r = reader.readNext(false);
+        }
+        assertEquals(10, numRead);
+        reader.close();
+        dlm.close();
+    }
+
+    /**
+     * Sanity check to make sure both checksum flag values work.
+     */
+    @Test(timeout = 60000)
+    public void testChecksumFlag() throws Exception {
+        String name = "testChecksumFlag";
+        LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
+        routingService.addHost(name, dlServer.getAddress());
+        DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+            .name(name)
+            .clientId(ClientId$.MODULE$.apply("test"))
+            .routingService(routingService)
+            .handshakeWithClientInfo(true)
+            .clientBuilder(ClientBuilder.get()
+                .hostConnectionLimit(1)
+                .connectionTimeout(Duration.fromSeconds(1))
+                .requestTimeout(Duration.fromSeconds(60)))
+            .checksum(false);
+        DistributedLogClient dlClient = dlClientBuilder.build();
+        Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
+        dlClient.close();
+
+        dlClient = dlClientBuilder.checksum(true).build();
+        Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
+        dlClient.close();
+    }
+
+    private void runSimpleBulkWriteTest(int writeCount) throws Exception {
+        String name = String.format("dlserver-bulk-write-%d", writeCount);
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+
+        logger.debug("Write {} entries to stream {}.", writeCount, name);
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        assertEquals(futures.size(), writeCount);
+        for (Future<DLSN> future : futures) {
+            // No throw == pass.
+            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+        }
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(1);
+        int numRead = 0;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead + 1, i);
+            ++numRead;
+            r = reader.readNext(false);
+        }
+        assertEquals(writeCount, numRead);
+        reader.close();
+        dlm.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWrite() throws Exception {
+        runSimpleBulkWriteTest(100);
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteSingleWrite() throws Exception {
+        runSimpleBulkWriteTest(1);
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteEmptyList() throws Exception {
+        String name = String.format("dlserver-bulk-write-%d", 0);
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+
+        assertEquals(0, futures.size());
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteNullArg() throws Exception {
+
+        String name = String.format("dlserver-bulk-write-%s", "null");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+        writes.add(null);
+
+        try {
+            dlClient.dlClient.writeBulk(name, writes);
+            fail("should not have succeeded");
+        } catch (NullPointerException npe) {
+            // expected
+            logger.info("Expected to catch NullPointException.");
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteEmptyBuffer() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "empty");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+        writes.add(ByteBuffer.wrap(("").getBytes()));
+        writes.add(ByteBuffer.wrap(("").getBytes()));
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        assertEquals(2, futures.size());
+        for (Future<DLSN> future : futures) {
+            // No throw == pass
+            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+        }
+    }
+
+    void failDueToWrongException(Exception ex) {
+        logger.info("testBulkWritePartialFailure: ", ex);
+        fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
+    }
+
+    int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
+        int failed = 0;
+        for (int i = start; i < finish; i++) {
+            Future<DLSN> future = futures.get(i);
+            try {
+                Await.result(future, Duration.fromSeconds(10));
+                fail("future should have failed!");
+            } catch (DLException cre) {
+                ++failed;
+            } catch (Exception ex) {
+                failDueToWrongException(ex);
+            }
+        }
+        return failed;
+    }
+
+    void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
+        try {
+            Await.result(future, Duration.fromSeconds(10));
+            fail("should have failed");
+        } catch (DLException dle) {
+            assertEquals(StatusCode.TOO_LARGE_RECORD.getValue(), dle.getCode());
+        } catch (Exception ex) {
+            failDueToWrongException(ex);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWritePartialFailure() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "partial-failure");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        final int writeCount = 100;
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+        // Too big, will cause partial failure.
+        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
+        writes.add(buf);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+
+        // Count succeeded.
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        int succeeded = 0;
+        for (int i = 0; i < writeCount; i++) {
+            Future<DLSN> future = futures.get(i);
+            try {
+                Await.result(future, Duration.fromSeconds(10));
+                ++succeeded;
+            } catch (Exception ex) {
+                failDueToWrongException(ex);
+            }
+        }
+
+        validateFailedAsLogRecordTooLong(futures.get(writeCount));
+        FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
+        assertEquals(writeCount, succeeded);
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "first-write-failed");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        final int writeCount = 100;
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
+        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
+        writes.add(buf);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        validateFailedAsLogRecordTooLong(futures.get(0));
+        FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteTotalFailureLostLock() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "lost-lock");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        final int writeCount = 8;
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
+        ByteBuffer buf = ByteBuffer.allocate(8);
+        writes.add(buf);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+        // Warm it up with a write.
+        Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));
+
+        // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
+        DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
+        try {
+            FailpointUtils.setFailpoint(
+                FailpointUtils.FailPointName.FP_WriteInternalLostLock,
+                FailpointUtils.FailPointActions.FailPointAction_Default
+            );
+            Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
+            assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
+        } finally {
+            FailpointUtils.removeFailpoint(
+                FailpointUtils.FailPointName.FP_WriteInternalLostLock
+            );
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testHeartbeat() throws Exception {
+        String name = "dlserver-heartbeat";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        for (long i = 1; i <= 10; i++) {
+            logger.debug("Send heartbeat {} to stream {}.", i, name);
+            dlClient.dlClient.check(name).get();
+        }
+
+        logger.debug("Write entry one to stream {}.", name);
+        dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();
+
+        Thread.sleep(1000);
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
+        int numRead = 0;
+        // eid=0 => control records
+        // other 9 heartbeats will not trigger writing any control records.
+        // eid=1 => user entry
+        long startEntryId = 1;
+        LogRecordWithDLSN r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead + 1, i);
+            assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
+            ++numRead;
+            ++startEntryId;
+            r = reader.readNext(false);
+        }
+        assertEquals(1, numRead);
+    }
+
+    @Test(timeout = 60000)
+    public void testFenceWrite() throws Exception {
+        String name = "dlserver-fence-write";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        for (long i = 1; i <= 10; i++) {
+            logger.debug("Write entry {} to stream {}.", i, name);
+            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
+        }
+
+        Thread.sleep(1000);
+
+        logger.info("Fencing stream {}.", name);
+        DLMTestUtil.fenceStream(conf, getUri(), name);
+        logger.info("Fenced stream {}.", name);
+
+        for (long i = 11; i <= 20; i++) {
+            logger.debug("Write entry {} to stream {}.", i, name);
+            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
+        }
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(1);
+        int numRead = 0;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead + 1, i);
+            ++numRead;
+            r = reader.readNext(false);
+        }
+        assertEquals(20, numRead);
+        reader.close();
+        dlm.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteStream() throws Exception {
+        String name = "dlserver-delete-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        long txid = 101;
+        for (long i = 1; i <= 10; i++) {
+            long curTxId = txid++;
+            logger.debug("Write entry {} to stream {}.", curTxId, name);
+            dlClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+        }
+
+        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+
+        dlClient.dlClient.delete(name).get();
+
+        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+
+        Thread.sleep(1000);
+
+        DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
+        AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
+        try {
+            FutureUtils.result(reader101.readNext());
+            fail("Should fail with LogNotFoundException since the stream is deleted");
+        } catch (LogNotFoundException lnfe) {
+            // expected
+        }
+        FutureUtils.result(reader101.asyncClose());
+        dlm101.close();
+
+        txid = 201;
+        for (long i = 1; i <= 10; i++) {
+            long curTxId = txid++;
+            logger.debug("Write entry {} to stream {}.", curTxId, name);
+            DLSN dlsn = dlClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+        }
+        Thread.sleep(1000);
+
+        DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader201 = dlm201.getInputStream(1);
+        int numRead = 0;
+        int curTxId = 201;
+        LogRecord r = reader201.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(curTxId++, i);
+            ++numRead;
+            r = reader201.readNext(false);
+        }
+        assertEquals(10, numRead);
+        reader201.close();
+        dlm201.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateStream() throws Exception {
+        try {
+            setupNoAdHocCluster();
+            final String name = "dlserver-create-stream";
+
+            noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
+            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+
+            long txid = 101;
+            for (long i = 1; i <= 10; i++) {
+                long curTxId = txid++;
+                logger.debug("Write entry {} to stream {}.", curTxId, name);
+                noAdHocClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+            }
+
+            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+        } finally {
+            tearDownNoAdHocCluster();
+        }
+    }
+
+    /**
+     * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
+     */
+    @Test(timeout = 60000)
+    public void testCreateStreamTwice() throws Exception {
+        try {
+            setupNoAdHocCluster();
+            final String name = "dlserver-create-stream-twice";
+
+            noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
+            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+
+            long txid = 101;
+            for (long i = 1; i <= 10; i++) {
+                long curTxId = txid++;
+                logger.debug("Write entry {} to stream {}.", curTxId, name);
+                noAdHocClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+            }
+
+            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+
+            // create again
+            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+        } finally {
+            tearDownNoAdHocCluster();
+        }
+    }
+
+
+
+    @Test(timeout = 60000)
+    public void testTruncateStream() throws Exception {
+        String name = "dlserver-truncate-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        long txid = 1;
+        Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
+        for (int s = 1; s <= 2; s++) {
+            for (long i = 1; i <= 10; i++) {
+                long curTxId = txid++;
+                logger.debug("Write entry {} to stream {}.", curTxId, name);
+                DLSN dlsn = dlClient.dlClient.write(name,
+                        ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+                txid2DLSN.put(curTxId, dlsn);
+            }
+            if (s == 1) {
+                dlClient.dlClient.release(name).get();
+            }
+        }
+
+        DLSN dlsnToDelete = txid2DLSN.get(11L);
+        dlClient.dlClient.truncate(name, dlsnToDelete).get();
+
+        DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = readDLM.getInputStream(1);
+        int numRead = 0;
+        int curTxId = 11;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(curTxId++, i);
+            ++numRead;
+            r = reader.readNext(false);
+        }
+        assertEquals(10, numRead);
+        reader.close();
+        readDLM.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRequestDenied() throws Exception {
+        String name = "request-denied";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        AccessControlEntry ace = new AccessControlEntry();
+        ace.setDenyWrite(true);
+        ZooKeeperClient zkc = TestZooKeeperClientBuilder
+                .newBuilder()
+                .uri(getUri())
+                .connectionTimeoutMs(60000)
+                .sessionTimeoutMs(60000)
+                .build();
+        DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
+        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
+        String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
+        ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
+        accessControl.create(zkc);
+
+        AccessControlManager acm = dlNamespace.createAccessControlManager();
+        while (acm.allowWrite(name)) {
+            Thread.sleep(100);
+        }
+
+        try {
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+            fail("Should fail with request denied exception");
+        } catch (DLException dle) {
+            assertEquals(StatusCode.REQUEST_DENIED.getValue(), dle.getCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testNoneStreamNameRegex() throws Exception {
+        String streamNamePrefix = "none-stream-name-regex-";
+        int numStreams = 5;
+        Set<String> streams = new HashSet<String>();
+
+        for (int i = 0; i < numStreams; i++) {
+            streams.add(streamNamePrefix + i);
+        }
+        testStreamNameRegex(streams, ".*", streams);
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamNameRegex() throws Exception {
+        String streamNamePrefix = "stream-name-regex-";
+        int numStreams = 5;
+        Set<String> streams = new HashSet<String>();
+        Set<String> expectedStreams = new HashSet<String>();
+        String streamNameRegex = streamNamePrefix + "1";
+
+        for (int i = 0; i < numStreams; i++) {
+            streams.add(streamNamePrefix + i);
+        }
+        expectedStreams.add(streamNamePrefix + "1");
+
+        testStreamNameRegex(streams, streamNameRegex, expectedStreams);
+    }
+
+    private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
+                                     Set<String> expectedStreams)
+            throws Exception {
+        for (String streamName : streams) {
+            dlClient.routingService.addHost(streamName, dlServer.getAddress());
+            Await.result(dlClient.dlClient.write(streamName,
+                    ByteBuffer.wrap(streamName.getBytes(UTF_8))));
+        }
+
+        DLClient client = createDistributedLogClient(
+                "test-stream-name-regex",
+                streamNameRegex,
+                Optional.<String>absent());
+        try {
+            client.routingService.addHost("unknown", dlServer.getAddress());
+            client.handshake();
+            Map<SocketAddress, Set<String>> distribution =
+                    client.dlClient.getStreamOwnershipDistribution();
+            assertEquals(1, distribution.size());
+            Set<String> cachedStreams = distribution.values().iterator().next();
+            assertNotNull(cachedStreams);
+            assertEquals(expectedStreams.size(), cachedStreams.size());
+
+            for (String streamName : cachedStreams) {
+                assertTrue(expectedStreams.contains(streamName));
+            }
+        } finally {
+            client.shutdown();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testReleaseStream() throws Exception {
+        String name = "dlserver-release-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+
+        // release the stream
+        Await.result(dlClient.dlClient.release(name));
+        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+    }
+
+    protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
+                             String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
+        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
+        assertEquals(expectedNumProxiesInClient, distribution.size());
+
+        if (expectedNumProxiesInClient > 0) {
+            Map.Entry<SocketAddress, Set<String>> localEntry =
+                    distribution.entrySet().iterator().next();
+            assertEquals(owner, localEntry.getKey());
+            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
+            assertEquals(existedInClient, localEntry.getValue().contains(name));
+        }
+
+
+        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
+
+        assertEquals(expectedServerCacheSize, cachedStreams.size());
+        assertEquals(existedInServer, cachedStreams.contains(name));
+        assertEquals(expectedServerCacheSize, acquiredStreams.size());
+        assertEquals(existedInServer, acquiredStreams.contains(name));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
new file mode 100644
index 0000000..c7ae960
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import org.junit.Test;
+
+/**
+ * Test the server with client side routing.
+ */
+public class TestDistributedLogServerClientRouting extends TestDistributedLogServerBase {
+
+    public TestDistributedLogServerClientRouting() {
+        super(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptNewStream() throws Exception {
+        String name = "dlserver-accept-new-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+        dlClient.routingService.setAllowRetrySameHost(false);
+
+        Await.result(dlClient.dlClient.setAcceptNewStream(false));
+
+        try {
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+            fail("Should fail because the proxy couldn't accept new stream");
+        } catch (NoBrokersAvailableException nbae) {
+            // expected
+        }
+        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+
+        Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true));
+        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
new file mode 100644
index 0000000..12416a3
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+/**
+ * Test the server with client side routing.
+ */
+public class TestDistributedLogServerServerRouting extends TestDistributedLogServerBase {
+
+    public TestDistributedLogServerServerRouting() {
+        super(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
new file mode 100644
index 0000000..4a2d65f
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.acl.DefaultAccessControlManager;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.config.NullStreamConfigProvider;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
+import org.apache.distributedlog.service.stream.Stream;
+import org.apache.distributedlog.service.stream.StreamImpl;
+import org.apache.distributedlog.service.stream.StreamImpl.StreamStatus;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import org.apache.distributedlog.service.stream.WriteOp;
+import org.apache.distributedlog.service.streamset.DelimiterStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for DistributedLog Service.
+ */
+public class TestDistributedLogService extends TestDistributedLogBase {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private ServerConfiguration serverConf;
+    private DistributedLogConfiguration dlConf;
+    private URI uri;
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private DistributedLogServiceImpl service;
+
+    @Before
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        dlConf = new DistributedLogConfiguration();
+        dlConf.addConfiguration(conf);
+        dlConf.setLockTimeout(0)
+                .setOutputBufferSize(0)
+                .setPeriodicFlushFrequencyMilliSeconds(10)
+                .setSchedulerShutdownTimeoutMs(100);
+        serverConf = newLocalServerConf();
+        uri = createDLMURI("/" + testName.getMethodName());
+        ensureURICreated(uri);
+        service = createService(serverConf, dlConf, latch);
+    }
+
+    @After
+    @Override
+    public void teardown() throws Exception {
+        if (null != service) {
+            service.shutdown();
+        }
+        super.teardown();
+    }
+
+    private DistributedLogConfiguration newLocalConf() {
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(dlConf);
+        return confLocal;
+    }
+
+    private ServerConfiguration newLocalServerConf() {
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.loadConf(dlConf);
+        serverConf.setServerThreads(1);
+        return serverConf;
+    }
+
+    private DistributedLogServiceImpl createService(
+            ServerConfiguration serverConf,
+            DistributedLogConfiguration dlConf) throws Exception {
+        return createService(serverConf, dlConf, new CountDownLatch(1));
+    }
+
+    private DistributedLogServiceImpl createService(
+            ServerConfiguration serverConf,
+            DistributedLogConfiguration dlConf,
+            CountDownLatch latch) throws Exception {
+        // Build the stream partition converter
+        StreamPartitionConverter converter;
+        try {
+            converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
+        } catch (ConfigurationException e) {
+            logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
+                    IdentityStreamPartitionConverter.class.getName());
+            converter = new IdentityStreamPartitionConverter();
+        }
+        return new DistributedLogServiceImpl(
+            serverConf,
+            dlConf,
+            ConfUtils.getConstDynConf(dlConf),
+            new NullStreamConfigProvider(),
+            uri,
+            converter,
+            new LocalRoutingService(),
+            NullStatsLogger.INSTANCE,
+            NullStatsLogger.INSTANCE,
+            latch,
+            new EqualLoadAppraiser());
+    }
+
+    private StreamImpl createUnstartedStream(DistributedLogServiceImpl service,
+                                             String name) throws Exception {
+        StreamImpl stream = (StreamImpl) service.newStream(name);
+        stream.initialize();
+        return stream;
+    }
+
+    private ByteBuffer createRecord(long txid) {
+        return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8));
+    }
+
+    private WriteOp createWriteOp(DistributedLogServiceImpl service,
+                                  String streamName,
+                                  long txid) {
+        ByteBuffer data = createRecord(txid);
+        return service.newWriteOp(streamName, data, null);
+    }
+
+    @Test(timeout = 60000)
+    public void testAcquireStreams() throws Exception {
+        String streamName = testName.getMethodName();
+        StreamImpl s0 = createUnstartedStream(service, streamName);
+        ServerConfiguration serverConf1 = new ServerConfiguration();
+        serverConf1.addConfiguration(serverConf);
+        serverConf1.setServerPort(9999);
+        DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
+        StreamImpl s1 = createUnstartedStream(service1, streamName);
+
+        // create write ops
+        WriteOp op0 = createWriteOp(service, streamName, 0L);
+        s0.submit(op0);
+
+        WriteOp op1 = createWriteOp(service1, streamName, 1L);
+        s1.submit(op1);
+
+        // check pending size
+        assertEquals("Write Op 0 should be pending in service 0",
+                1, s0.numPendingOps());
+        assertEquals("Write Op 1 should be pending in service 1",
+                1, s1.numPendingOps());
+
+        // start acquiring s0
+        s0.start();
+        WriteResponse wr0 = Await.result(op0.result());
+        assertEquals("Op 0 should succeed",
+                StatusCode.SUCCESS, wr0.getHeader().getCode());
+        assertEquals("Service 0 should acquire stream",
+                StreamStatus.INITIALIZED, s0.getStatus());
+        assertNotNull(s0.getManager());
+        assertNotNull(s0.getWriter());
+        assertNull(s0.getLastException());
+
+        // start acquiring s1
+        s1.start();
+        WriteResponse wr1 = Await.result(op1.result());
+        assertEquals("Op 1 should fail",
+                StatusCode.FOUND, wr1.getHeader().getCode());
+        // the stream will be set to ERROR and then be closed.
+        assertTrue("Service 1 should be in unavailable state",
+                StreamStatus.isUnavailable(s1.getStatus()));
+        assertNotNull(s1.getManager());
+        assertNull(s1.getWriter());
+        assertNotNull(s1.getLastException());
+        assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException);
+
+        service1.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception {
+        String streamName = testName.getMethodName() + "_0000";
+
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(dlConf);
+        confLocal.setMaxCachedPartitionsPerProxy(1);
+
+        ServerConfiguration serverConfLocal = new ServerConfiguration();
+        serverConfLocal.addConfiguration(serverConf);
+        serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
+
+        DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
+        Stream stream = serviceLocal.getLogWriter(streamName);
+
+        // stream is cached
+        assertNotNull(stream);
+        assertEquals(1, serviceLocal.getStreamManager().numCached());
+
+        // create write ops
+        WriteOp op0 = createWriteOp(service, streamName, 0L);
+        stream.submit(op0);
+        WriteResponse wr0 = Await.result(op0.result());
+        assertEquals("Op 0 should succeed",
+                StatusCode.SUCCESS, wr0.getHeader().getCode());
+        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+
+        // should fail to acquire another partition
+        try {
+            serviceLocal.getLogWriter(testName.getMethodName() + "_0001");
+            fail("Should fail to acquire new streams");
+        } catch (StreamUnavailableException sue) {
+            // expected
+        }
+        assertEquals(1, serviceLocal.getStreamManager().numCached());
+        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+
+        // should be able to acquire partitions from other streams
+        String anotherStreamName = testName.getMethodName() + "-another_0001";
+        Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
+        assertNotNull(anotherStream);
+        assertEquals(2, serviceLocal.getStreamManager().numCached());
+
+        // create write ops
+        WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
+        anotherStream.submit(op1);
+        WriteResponse wr1 = Await.result(op1.result());
+        assertEquals("Op 1 should succeed",
+                StatusCode.SUCCESS, wr1.getHeader().getCode());
+        assertEquals(2, serviceLocal.getStreamManager().numAcquired());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception {
+        String streamName = testName.getMethodName() + "_0000";
+
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(dlConf);
+        confLocal.setMaxCachedPartitionsPerProxy(-1);
+        confLocal.setMaxAcquiredPartitionsPerProxy(1);
+
+        ServerConfiguration serverConfLocal = new ServerConfiguration();
+        serverConfLocal.addConfiguration(serverConf);
+        serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
+
+        DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
+        Stream stream = serviceLocal.getLogWriter(streamName);
+
+        // stream is cached
+        assertNotNull(stream);
+        assertEquals(1, serviceLocal.getStreamManager().numCached());
+
+        // create write ops
+        WriteOp op0 = createWriteOp(service, streamName, 0L);
+        stream.submit(op0);
+        WriteResponse wr0 = Await.result(op0.result());
+        assertEquals("Op 0 should succeed",
+                StatusCode.SUCCESS, wr0.getHeader().getCode());
+        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+
+        // should be able to cache partitions from same stream
+        String anotherStreamName = testName.getMethodName() + "_0001";
+        Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
+        assertNotNull(anotherStream);
+        assertEquals(2, serviceLocal.getStreamManager().numCached());
+
+        // create write ops
+        WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
+        anotherStream.submit(op1);
+        WriteResponse wr1 = Await.result(op1.result());
+        assertEquals("Op 1 should fail",
+                StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode());
+        assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+    }
+
+    @Test(timeout = 60000)
+    public void testCloseShouldErrorOutPendingOps() throws Exception {
+        String streamName = testName.getMethodName();
+        StreamImpl s = createUnstartedStream(service, streamName);
+
+        int numWrites = 10;
+        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
+        for (int i = 0; i < numWrites; i++) {
+            WriteOp op = createWriteOp(service, streamName, i);
+            s.submit(op);
+            futureList.add(op.result());
+        }
+        assertEquals(numWrites, s.numPendingOps());
+        Await.result(s.requestClose("close stream"));
+        assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED,
+                StreamStatus.CLOSED, s.getStatus());
+        for (int i = 0; i < numWrites; i++) {
+            Future<WriteResponse> future = futureList.get(i);
+            WriteResponse wr = Await.result(future);
+            assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
+                    StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testCloseTwice() throws Exception {
+        String streamName = testName.getMethodName();
+        StreamImpl s = createUnstartedStream(service, streamName);
+
+        int numWrites = 10;
+        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
+        for (int i = 0; i < numWrites; i++) {
+            WriteOp op = createWriteOp(service, streamName, i);
+            s.submit(op);
+            futureList.add(op.result());
+        }
+        assertEquals(numWrites, s.numPendingOps());
+
+        Future<Void> closeFuture0 = s.requestClose("close 0");
+        assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
+                StreamStatus.CLOSING == s.getStatus()
+                    || StreamStatus.CLOSED == s.getStatus());
+        Future<Void> closeFuture1 = s.requestClose("close 1");
+        assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
+                StreamStatus.CLOSING == s.getStatus()
+                    || StreamStatus.CLOSED == s.getStatus());
+
+        Await.result(closeFuture0);
+        assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
+                StreamStatus.CLOSED, s.getStatus());
+        Await.result(closeFuture1);
+        assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
+                StreamStatus.CLOSED, s.getStatus());
+
+        for (int i = 0; i < numWrites; i++) {
+            Future<WriteResponse> future = futureList.get(i);
+            WriteResponse wr = Await.result(future);
+            assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
+                    StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testFailRequestsDuringClosing() throws Exception {
+        String streamName = testName.getMethodName();
+        StreamImpl s = createUnstartedStream(service, streamName);
+
+        Future<Void> closeFuture = s.requestClose("close");
+        assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
+                StreamStatus.CLOSING == s.getStatus()
+                    || StreamStatus.CLOSED == s.getStatus());
+        WriteOp op1 = createWriteOp(service, streamName, 0L);
+        s.submit(op1);
+        WriteResponse response1 = Await.result(op1.result());
+        assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing",
+                StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode());
+
+        Await.result(closeFuture);
+        assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
+                StreamStatus.CLOSED, s.getStatus());
+        WriteOp op2 = createWriteOp(service, streamName, 1L);
+        s.submit(op2);
+        WriteResponse response2 = Await.result(op2.result());
+        assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed",
+                StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode());
+    }
+
+    @Test(timeout = 60000)
+    public void testServiceTimeout() throws Exception {
+        DistributedLogConfiguration confLocal = newLocalConf();
+        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+                .setImmediateFlushEnabled(false)
+                .setPeriodicFlushFrequencyMilliSeconds(0);
+        ServerConfiguration serverConfLocal = newLocalServerConf();
+        serverConfLocal.addConfiguration(serverConf);
+        serverConfLocal.setServiceTimeoutMs(200)
+                .setStreamProbationTimeoutMs(100);
+        String streamName = testName.getMethodName();
+        // create a new service with 200ms timeout
+        DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal);
+        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+        int numWrites = 10;
+        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
+        for (int i = 0; i < numWrites; i++) {
+            futureList.add(localService.write(streamName, createRecord(i)));
+        }
+
+        assertTrue("Stream " + streamName + " should be cached",
+                streamManager.getCachedStreams().containsKey(streamName));
+
+        StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName);
+        // the stream should be set CLOSING
+        while (StreamStatus.CLOSING != s.getStatus()
+            && StreamStatus.CLOSED != s.getStatus()) {
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        assertNotNull("Writer should be initialized", s.getWriter());
+        assertNull("No exception should be thrown", s.getLastException());
+        Future<Void> closeFuture = s.getCloseFuture();
+        Await.result(closeFuture);
+        for (int i = 0; i < numWrites; i++) {
+            assertTrue("Write should not fail before closing",
+                    futureList.get(i).isDefined());
+            WriteResponse response = Await.result(futureList.get(i));
+            assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
+                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+                        || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+        }
+
+        while (streamManager.getCachedStreams().containsKey(streamName)) {
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+
+        assertFalse("Stream should be removed from cache",
+                streamManager.getCachedStreams().containsKey(streamName));
+        assertFalse("Stream should be removed from acquired cache",
+                streamManager.getAcquiredStreams().containsKey(streamName));
+
+        localService.shutdown();
+    }
+
+    private DistributedLogServiceImpl createConfiguredLocalService() throws Exception {
+        DistributedLogConfiguration confLocal = newLocalConf();
+        confLocal.setOutputBufferSize(0)
+                .setImmediateFlushEnabled(true)
+                .setPeriodicFlushFrequencyMilliSeconds(0);
+        return createService(serverConf, confLocal);
+    }
+
+    private ByteBuffer getTestDataBuffer() {
+        return ByteBuffer.wrap("test-data".getBytes());
+    }
+
+    @Test(timeout = 60000)
+    public void testNonDurableWrite() throws Exception {
+        DistributedLogConfiguration confLocal = newLocalConf();
+        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+                .setImmediateFlushEnabled(false)
+                .setPeriodicFlushFrequencyMilliSeconds(0)
+                .setDurableWriteEnabled(false);
+        ServerConfiguration serverConfLocal = new ServerConfiguration();
+        serverConfLocal.addConfiguration(serverConf);
+        serverConfLocal.enableDurableWrite(false);
+        serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE)
+                .setStreamProbationTimeoutMs(Integer.MAX_VALUE);
+        String streamName = testName.getMethodName();
+        DistributedLogServiceImpl localService =
+                createService(serverConfLocal, confLocal);
+        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+        int numWrites = 10;
+        List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>();
+        for (int i = 0; i < numWrites; i++) {
+            futureList.add(localService.write(streamName, createRecord(i)));
+        }
+        assertTrue("Stream " + streamName + " should be cached",
+                streamManager.getCachedStreams().containsKey(streamName));
+        List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList));
+        for (WriteResponse wr : resultList) {
+            assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn()));
+        }
+
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testWriteOpNoChecksum() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext();
+        Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testTruncateOpNoChecksum() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext();
+        Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamOpNoChecksum() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext();
+        HeartbeatOptions option = new HeartbeatOptions();
+        option.setSendHeartBeatToReader(true);
+
+        // hearbeat to acquire the stream and then release the stream
+        Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+        result = localService.release("test", ctx);
+        resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+
+        // heartbeat to acquire the stream and then delete the stream
+        result = localService.heartbeatWithOptions("test", ctx, option);
+        resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+        result = localService.delete("test", ctx);
+        resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+
+        // shutdown the local service
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testWriteOpChecksumBadChecksum() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext().setCrc32(999);
+        Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testWriteOpChecksumBadStream() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext().setCrc32(
+            ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array()));
+        Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testWriteOpChecksumBadData() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        ByteBuffer buffer = getTestDataBuffer();
+        WriteContext ctx = new WriteContext().setCrc32(
+            ProtocolUtils.writeOpCRC32("test", buffer.array()));
+
+        // Overwrite 1 byte to corrupt data.
+        buffer.put(1, (byte) 0xab);
+        Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamOpChecksumBadChecksum() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext().setCrc32(999);
+        Future<WriteResponse> result = localService.heartbeat("test", ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        result = localService.release("test", ctx);
+        resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        result = localService.delete("test", ctx);
+        resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testTruncateOpChecksumBadChecksum() throws Exception {
+        DistributedLogServiceImpl localService = createConfiguredLocalService();
+        WriteContext ctx = new WriteContext().setCrc32(999);
+        Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
+        WriteResponse resp = Await.result(result);
+        assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+        localService.shutdown();
+    }
+
+    private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) {
+        return new WriteOp(name,
+            ByteBuffer.wrap("test".getBytes()),
+            new NullStatsLogger(),
+            new NullStatsLogger(),
+            new IdentityStreamPartitionConverter(),
+            new ServerConfiguration(),
+            (byte) 0,
+            checksum,
+            false,
+            disabledFeature,
+            DefaultAccessControlManager.INSTANCE);
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception {
+        String streamName = testName.getMethodName();
+
+        SettableFeature disabledFeature = new SettableFeature("", 0);
+
+        WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L);
+        WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L);
+
+        try {
+            writeOp0.preExecute();
+            fail("should have thrown");
+        } catch (Exception ex) {
+        }
+
+        disabledFeature.set(1);
+        writeOp1.preExecute();
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception {
+        String streamName = testName.getMethodName();
+
+        SettableFeature disabledFeature = new SettableFeature("", 1);
+        WriteOp writeOp0 = getWriteOp(
+            streamName,
+            disabledFeature,
+            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+        WriteOp writeOp1 = getWriteOp(
+            streamName,
+            disabledFeature,
+            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+
+        writeOp0.preExecute();
+        disabledFeature.set(0);
+        writeOp1.preExecute();
+    }
+
+    @Test(timeout = 60000)
+    public void testCloseStreamsShouldFlush() throws Exception {
+        DistributedLogConfiguration confLocal = newLocalConf();
+        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+                .setImmediateFlushEnabled(false)
+                .setPeriodicFlushFrequencyMilliSeconds(0);
+
+        String streamNamePrefix = testName.getMethodName();
+        DistributedLogServiceImpl localService = createService(serverConf, confLocal);
+        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+        int numStreams = 10;
+        int numWrites = 10;
+        List<Future<WriteResponse>> futureList =
+                Lists.newArrayListWithExpectedSize(numStreams * numWrites);
+        for (int i = 0; i < numStreams; i++) {
+            String streamName = streamNamePrefix + "-" + i;
+            HeartbeatOptions hbOptions = new HeartbeatOptions();
+            hbOptions.setSendHeartBeatToReader(true);
+            // make sure the first log segment of each stream created
+            FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+            for (int j = 0; j < numWrites; j++) {
+                futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
+            }
+        }
+
+        assertEquals("There should be " + numStreams + " streams in cache",
+                numStreams, streamManager.getCachedStreams().size());
+        while (streamManager.getAcquiredStreams().size() < numStreams) {
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+
+        Future<List<Void>> closeResult = localService.closeStreams();
+        List<Void> closedStreams = Await.result(closeResult);
+        assertEquals("There should be " + numStreams + " streams closed",
+                numStreams, closedStreams.size());
+        // all writes should be flushed
+        for (Future<WriteResponse> future : futureList) {
+            WriteResponse response = Await.result(future);
+            assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(),
+                    StatusCode.SUCCESS == response.getHeader().getCode()
+                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+                        || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
+        }
+        assertTrue("There should be no streams in the cache",
+                streamManager.getCachedStreams().isEmpty());
+        assertTrue("There should be no streams in the acquired cache",
+                streamManager.getAcquiredStreams().isEmpty());
+
+        localService.shutdown();
+    }
+
+    @Test(timeout = 60000)
+    public void testCloseStreamsShouldAbort() throws Exception {
+        DistributedLogConfiguration confLocal = newLocalConf();
+        confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+                .setImmediateFlushEnabled(false)
+                .setPeriodicFlushFrequencyMilliSeconds(0);
+
+        String streamNamePrefix = testName.getMethodName();
+        DistributedLogServiceImpl localService = createService(serverConf, confLocal);
+        StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+        int numStreams = 10;
+        int numWrites = 10;
+        List<Future<WriteResponse>> futureList =
+                Lists.newArrayListWithExpectedSize(numStreams * numWrites);
+        for (int i = 0; i < numStreams; i++) {
+            String streamName = streamNamePrefix + "-" + i;
+            HeartbeatOptions hbOptions = new HeartbeatOptions();
+            hbOptions.setSendHeartBeatToReader(true);
+            // make sure the first log segment of each stream created
+            FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+            for (int j = 0; j < numWrites; j++) {
+                futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
+            }
+        }
+
+        assertEquals("There should be " + numStreams + " streams in cache",
+                numStreams, streamManager.getCachedStreams().size());
+        while (streamManager.getAcquiredStreams().size() < numStreams) {
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+
+        for (Stream s : streamManager.getAcquiredStreams().values()) {
+            StreamImpl stream = (StreamImpl) s;
+            stream.setStatus(StreamStatus.ERROR);
+        }
+
+        Future<List<Void>> closeResult = localService.closeStreams();
+        List<Void> closedStreams = Await.result(closeResult);
+        assertEquals("There should be " + numStreams + " streams closed",
+                numStreams, closedStreams.size());
+        // all writes should be flushed
+        for (Future<WriteResponse> future : futureList) {
+            WriteResponse response = Await.result(future);
+            assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : "
+                    + response.getHeader().getCode(),
+                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+                        || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+        }
+        // acquired streams should all been removed after we close them
+        assertTrue("There should be no streams in the acquired cache",
+            streamManager.getAcquiredStreams().isEmpty());
+        localService.shutdown();
+        // cached streams wouldn't be removed immediately after streams are closed
+        // but they should be removed after we shutdown the service
+        assertTrue("There should be no streams in the cache after shutting down the service",
+            streamManager.getCachedStreams().isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testShutdown() throws Exception {
+        service.shutdown();
+        StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager();
+        WriteResponse response =
+                Await.result(service.write(testName.getMethodName(), createRecord(0L)));
+        assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE,
+                StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode());
+        assertTrue("There should be no streams created after shutdown",
+                streamManager.getCachedStreams().isEmpty());
+        assertTrue("There should be no streams acquired after shutdown",
+                streamManager.getAcquiredStreams().isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testGetOwner() throws Exception {
+        ((LocalRoutingService) service.getRoutingService())
+                .addHost("stream-0", service.getServiceAddress().getSocketAddress())
+                .setAllowRetrySameHost(false);
+
+        service.startPlacementPolicy();
+
+        WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
+
+        // service cache "stream-2"
+        StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false);
+        // create write ops to stream-2 to make service acquire the stream
+        WriteOp op = createWriteOp(service, "stream-2", 0L);
+        stream.submit(op);
+        stream.start();
+        WriteResponse wr = Await.result(op.result());
+        assertEquals("Op should succeed",
+                StatusCode.SUCCESS, wr.getHeader().getCode());
+        assertEquals("Service should acquire stream",
+                StreamStatus.INITIALIZED, stream.getStatus());
+        assertNotNull(stream.getManager());
+        assertNotNull(stream.getWriter());
+        assertNull(stream.getLastException());
+
+        // the stream is acquired
+        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
+    }
+
+}