You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:43 UTC

[38/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
new file mode 100644
index 0000000..d2d61a9
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.addr.WeightedAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link RoutingService}.
+ */
+@RunWith(Parameterized.class)
+public class TestRoutingService {
+
+    static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class);
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        ArrayList<Object[]> list = new ArrayList<Object[]>();
+        for (int i = 0; i <= 1; i++) {
+            for (int j = 0; j <= 1; j++) {
+                for (int k = 0; k <= 1; k++) {
+                    list.add(new Boolean[] {i == 1, j == 1, k == 1});
+                }
+            }
+        }
+        return list;
+    }
+
+    private final boolean consistentHash;
+    private final boolean weightedAddresses;
+    private final boolean asyncResolution;
+
+    public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) {
+        this.consistentHash = consistentHash;
+        this.weightedAddresses = weightedAddresses;
+        this.asyncResolution = asyncResolution;
+    }
+
+    private List<Address> getAddresses(boolean weightedAddresses) {
+        ArrayList<Address> addresses = new ArrayList<Address>();
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181)));
+
+        if (weightedAddresses) {
+            ArrayList<Address> wAddresses = new ArrayList<Address>();
+            for (Address address: addresses) {
+                wAddresses.add(WeightedAddress.apply(address, 1.0));
+            }
+            return wAddresses;
+        } else {
+            return addresses;
+        }
+    }
+
+    private void testRoutingServiceHelper(boolean consistentHash,
+                                          boolean weightedAddresses,
+                                          boolean asyncResolution)
+        throws Exception {
+        ExecutorService executorService = null;
+        final List<Address> addresses = getAddresses(weightedAddresses);
+        final TestName name = new TestName();
+        RoutingService routingService;
+        if (consistentHash) {
+            routingService = ConsistentHashRoutingService.newBuilder()
+                    .serverSet(new NameServerSet(name))
+                    .resolveFromName(true)
+                    .numReplicas(997)
+                    .build();
+        } else {
+            routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder()
+                    .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build();
+        }
+
+        if (asyncResolution) {
+            executorService = Executors.newSingleThreadExecutor();
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    name.changeAddrs(addresses);
+                }
+            });
+        } else {
+            name.changeAddrs(addresses);
+        }
+        routingService.startService();
+
+        HashSet<SocketAddress> mapping = new HashSet<SocketAddress>();
+
+        for (int i = 0; i < 1000; i++) {
+            for (int j = 0; j < 5; j++) {
+                String stream = "TestStream-" + i + "-" + j;
+                mapping.add(routingService.getHost(stream,
+                        RoutingService.RoutingContext.of(new DefaultRegionResolver())));
+            }
+        }
+
+        assertEquals(mapping.size(), addresses.size());
+
+        if (null != executorService) {
+            executorService.shutdown();
+        }
+
+    }
+
+    @Test(timeout = 5000)
+    public void testRoutingService() throws Exception {
+        testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..ab0cb58
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import com.twitter.util.CountDownLatch;
+import com.twitter.util.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}.
+ */
+public class TestDefaultSpeculativeRequestExecutionPolicy {
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testInvalidBackoffMultiplier() throws Exception {
+        new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1);
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testInvalidMaxSpeculativeTimeout() throws Exception {
+        new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2);
+    }
+
+    @Test(timeout = 20000)
+    public void testSpeculativeRequests() throws Exception {
+        DefaultSpeculativeRequestExecutionPolicy policy =
+                new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2);
+        SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+        final AtomicInteger callCount = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(3);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                try {
+                    return Future.value(callCount.incrementAndGet() < 3);
+                } finally {
+                    latch.countDown();
+                }
+            }
+        }).when(executor).issueSpeculativeRequest();
+
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        policy.initiateSpeculativeRequest(executorService, executor);
+
+        latch.await();
+
+        assertEquals(40, policy.getNextSpeculativeRequestTimeout());
+    }
+
+    @Test(timeout = 20000)
+    public void testSpeculativeRequestsWithMaxTimeout() throws Exception {
+        DefaultSpeculativeRequestExecutionPolicy policy =
+                new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2);
+        SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+        final AtomicInteger callCount = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(3);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                try {
+                    return Future.value(callCount.incrementAndGet() < 3);
+                } finally {
+                    latch.countDown();
+                }
+            }
+        }).when(executor).issueSpeculativeRequest();
+
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        policy.initiateSpeculativeRequest(executorService, executor);
+
+        latch.await();
+
+        assertEquals(15, policy.getNextSpeculativeRequestTimeout());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
new file mode 100644
index 0000000..d2df9a5
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertFalse;
+
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import org.junit.Test;
+
+/**
+ * Test Case of {@link org.apache.distributedlog.service.DistributedLogClientBuilder}.
+ */
+public class TestDistributedLogClientBuilder {
+
+    @Test(timeout = 60000)
+    public void testBuildClientsFromSameBuilder() throws Exception {
+        DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+                .name("build-clients-from-same-builder")
+                .clientId(ClientId$.MODULE$.apply("test-builder"))
+                .finagleNameStr("inet!127.0.0.1:7001")
+                .streamNameRegex(".*")
+                .handshakeWithClientInfo(true)
+                .clientBuilder(ClientBuilder.get()
+                    .hostConnectionLimit(1)
+                    .connectTimeout(Duration.fromSeconds(1))
+                    .tcpConnectTimeout(Duration.fromSeconds(1))
+                    .requestTimeout(Duration.fromSeconds(10)));
+        DistributedLogClient client1 = builder.build();
+        DistributedLogClient client2 = builder.build();
+        assertFalse(client1 == client2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-core/conf/log4j.properties b/distributedlog-core/conf/log4j.properties
index cafc888..38ab34d 100644
--- a/distributedlog-core/conf/log4j.properties
+++ b/distributedlog-core/conf/log4j.properties
@@ -32,11 +32,11 @@ log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.org.apache.bookkeeper=INFO
 
 # redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.com.twitter.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
+log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
+log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
 log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.com.twitter.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
+log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
+log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
 log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
 
 log4j.appender.Executors=org.apache.log4j.RollingFileAppender

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/conf/zookeeper.conf.dynamic.template
----------------------------------------------------------------------
diff --git a/distributedlog-core/conf/zookeeper.conf.dynamic.template b/distributedlog-core/conf/zookeeper.conf.dynamic.template
index 4bda9f1..f4e35f5 100644
--- a/distributedlog-core/conf/zookeeper.conf.dynamic.template
+++ b/distributedlog-core/conf/zookeeper.conf.dynamic.template
@@ -1 +1 @@
-#/**# * Copyright 2007 The Apache Software Foundation# *# * Licensed to the Apache Software Foundation (ASF) under one# * or more contributor license agreements.  See the NOTICE file# * distributed with this work for additional information# * regarding copyright ownership.  The ASF licenses this file# * to you under the Apache License, Version 2.0 (the# * "License"); you may not use this file except in compliance# * with the License.  You may obtain a copy of the License at# *# *     http://www.apache.org/licenses/LICENSE-2.0# *# * Unless required by applicable law or agreed to in writing, software# * distributed under the License is distributed on an "AS IS" BASIS,# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# * See the License for the specific language governing permissions and# * limitations under the License.# */server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181
\ No newline at end of file
+#/**# * Copyright 2007 The Apache Software Foundation# *# * Licensed to the Apache Software Foundation (ASF) under one# * or more contributor license agreements.  See the NOTICE file# * distributed with this work for additional information# * regarding copyright ownership.  The ASF licenses this file# * to you under the Apache License, Version 2.0 (the# * "License"); you may not use this file except in compliance# * with the License.  You may obtain a copy of the License at# *# *     http://www.apache.org/licenses/LICENSE-2.0# *# * Unless required by applicable law or agreed to in writing, software# * distributed under the License is distributed on an "AS IS" BASIS,# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# * See the License for the specific language governing permissions and# * limitations under the License.# */server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml
index c5329aa..c4bfa8f 100644
--- a/distributedlog-core/pom.xml
+++ b/distributedlog-core/pom.xml
@@ -206,7 +206,7 @@
           <properties>
             <property>
               <name>listener</name>
-              <value>com.twitter.distributedlog.TimedOutTestsListener</value>
+              <value>org.apache.distributedlog.TimedOutTestsListener</value>
             </property>
           </properties>
         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java
deleted file mode 100644
index 0f93bfe..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AppendOnlyStreamReader extends InputStream {
-    static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamReader.class);
-
-    private LogRecordWithInputStream currentLogRecord = null;
-    private final DistributedLogManager dlm;
-    private LogReader reader;
-    private long currentPosition;
-    private static final int SKIP_BUFFER_SIZE = 512;
-
-    // Cache the input stream for a log record.
-    private static class LogRecordWithInputStream {
-        private final InputStream payloadStream;
-        private final LogRecordWithDLSN logRecord;
-
-        LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
-            Preconditions.checkNotNull(logRecord);
-
-            LOG.debug("Got record dlsn = {}, txid = {}, len = {}",
-                new Object[] {logRecord.getDlsn(), logRecord.getTransactionId(), logRecord.getPayload().length});
-
-            this.logRecord = logRecord;
-            this.payloadStream = logRecord.getPayLoadInputStream();
-        }
-
-        InputStream getPayLoadInputStream() {
-            return payloadStream;
-        }
-
-        LogRecordWithDLSN getLogRecord() {
-            return logRecord;
-        }
-
-        // The last txid of the log record is the position of the next byte in the stream.
-        // Subtract length to get starting offset.
-        long getOffset() {
-            return logRecord.getTransactionId() - logRecord.getPayload().length;
-        }
-    }
-
-    /**
-     * Construct ledger input stream
-     *
-     * @param dlm the Distributed Log Manager to access the stream
-     */
-    AppendOnlyStreamReader(DistributedLogManager dlm)
-        throws IOException {
-        this.dlm = dlm;
-        reader = dlm.getInputStream(0);
-        currentPosition = 0;
-    }
-
-    /**
-     * Get input stream representing next entry in the
-     * ledger.
-     *
-     * @return input stream, or null if no more entries
-     */
-    private LogRecordWithInputStream nextLogRecord() throws IOException {
-        return nextLogRecord(reader);
-    }
-
-    private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException {
-        LogRecordWithDLSN record = reader.readNext(false);
-
-        if (null != record) {
-            return new LogRecordWithInputStream(record);
-        } else {
-            record = reader.readNext(false);
-            if (null != record) {
-                return new LogRecordWithInputStream(record);
-            } else {
-                LOG.debug("No record");
-                return null;
-            }
-        }
-    }
-
-    @Override
-    public int read() throws IOException {
-        byte[] b = new byte[1];
-        if (read(b, 0, 1) != 1) {
-            return -1;
-        } else {
-            return b[0];
-        }
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        int read = 0;
-        if (currentLogRecord == null) {
-            currentLogRecord = nextLogRecord();
-            if (currentLogRecord == null) {
-                return read;
-            }
-        }
-
-        while (read < len) {
-            int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, (len - read));
-            if (thisread == -1) {
-                currentLogRecord = nextLogRecord();
-                if (currentLogRecord == null) {
-                    return read;
-                }
-            } else {
-                LOG.debug("Offset saved = {}, persisted = {}",
-                    currentPosition, currentLogRecord.getLogRecord().getTransactionId());
-                currentPosition += thisread;
-                read += thisread;
-            }
-        }
-        return read;
-    }
-
-    /**
-     * Position the reader at the given offset. If we fail to skip to the desired position
-     * and don't hit end of stream, return false.
-     *
-     * @throws com.twitter.distributedlog.exceptions.EndOfStreamException if we attempt to
-     *         skip past the end of the stream.
-     */
-    public boolean skipTo(long position) throws IOException {
-
-        // No need to skip anywhere.
-        if (position == position()) {
-            return true;
-        }
-
-        LogReader skipReader = dlm.getInputStream(position);
-        LogRecordWithInputStream logRecord = null;
-        try {
-            logRecord = nextLogRecord(skipReader);
-        } catch (IOException ex) {
-            skipReader.close();
-            throw ex;
-        }
-
-        if (null == logRecord) {
-            return false;
-        }
-
-        // We may end up with a reader positioned *before* the requested position if
-        // we're near the tail and the writer is still active, or if the desired position
-        // is not at a log record payload boundary.
-        // Transaction ID gives us the starting position of the log record. Read ahead
-        // if necessary.
-        currentPosition = logRecord.getOffset();
-        currentLogRecord = logRecord;
-        LogReader oldReader = reader;
-        reader = skipReader;
-
-        // Close the oldreader after swapping AppendOnlyStreamReader state. Close may fail
-        // and we need to make sure it leaves AppendOnlyStreamReader in a consistent state.
-        oldReader.close();
-
-        byte[] skipBuffer = new byte[SKIP_BUFFER_SIZE];
-        while (currentPosition < position) {
-            long bytesToRead = Math.min(position - currentPosition, SKIP_BUFFER_SIZE);
-            long bytesRead = read(skipBuffer, 0, (int)bytesToRead);
-            if (bytesRead < bytesToRead) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    public long position() {
-        return currentPosition;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java
deleted file mode 100644
index aa0aef9..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AppendOnlyStreamWriter implements Closeable {
-    static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class);
-
-    // Use a 1-length array to satisfy Java's inner class reference rules. Use primitive
-    // type because synchronized block is needed anyway.
-    final long[] syncPos = new long[1];
-    BKAsyncLogWriter logWriter;
-    long requestPos = 0;
-
-    public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) {
-        LOG.debug("initialize at position {}", pos);
-        this.logWriter = logWriter;
-        this.syncPos[0] = pos;
-        this.requestPos = pos;
-    }
-
-    public Future<DLSN> write(byte[] data) {
-        requestPos += data.length;
-        Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
-        return writeResult.addEventListener(new WriteCompleteListener(requestPos));
-    }
-
-    public void force(boolean metadata) throws IOException {
-        long pos = 0;
-        try {
-            pos = Await.result(logWriter.flushAndCommit());
-        } catch (IOException ioe) {
-            throw ioe;
-        } catch (Exception ex) {
-            LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex);
-            throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex);
-        }
-        synchronized (syncPos) {
-            syncPos[0] = pos;
-        }
-    }
-
-    public long position() {
-        synchronized (syncPos) {
-            return syncPos[0];
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        logWriter.closeAndComplete();
-    }
-
-    public void markEndOfStream() throws IOException {
-        try {
-            Await.result(logWriter.markEndOfStream());
-        } catch (IOException ioe) {
-            throw ioe;
-        } catch (Exception ex) {
-            throw new UnexpectedException("Mark end of stream hit unexpected exception", ex);
-        }
-    }
-
-    class WriteCompleteListener implements FutureEventListener<DLSN> {
-        private final long position;
-        public WriteCompleteListener(long position) {
-            this.position = position;
-        }
-        @Override
-        public void onSuccess(DLSN response) {
-            synchronized (syncPos) {
-                if (position > syncPos[0]) {
-                    syncPos[0] = position;
-                }
-            }
-        }
-        @Override
-        public void onFailure(Throwable cause) {
-            // Handled at the layer above
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java
deleted file mode 100644
index 8e07797..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public interface AsyncLogReader extends AsyncCloseable {
-
-    /**
-     * Get stream name that the reader reads from.
-     *
-     * @return stream name.
-     */
-    public String getStreamName();
-
-    /**
-     * Read the next record from the log stream
-     *
-     * @return A promise that when satisfied will contain the Log Record with its DLSN.
-     */
-    public Future<LogRecordWithDLSN> readNext();
-
-    /**
-     * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
-     * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
-     * call.
-     *
-     * @param numEntries
-     *          num entries
-     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
-     */
-    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
-
-    /**
-     * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
-     * <p>
-     * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
-     * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
-     * wait until new entries are available.
-     *
-     * @param numEntries
-     *          max entries to return
-     * @param waitTime
-     *          maximum wait time if there are entries already for read
-     * @param timeUnit
-     *          wait time unit
-     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
-     */
-    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java
deleted file mode 100644
index e83e343..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.util.List;
-
-public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
-
-    /**
-     * Get the last committed transaction id.
-     *
-     * @return last committed transaction id.
-     */
-    public long getLastTxId();
-
-    /**
-     * Write a log record to the stream.
-     *
-     * @param record single log record
-     * @return A Future which contains a DLSN if the record was successfully written
-     * or an exception if the write fails
-     */
-    public Future<DLSN> write(LogRecord record);
-
-    /**
-     * Write log records to the stream in bulk. Each future in the list represents the result of
-     * one write operation. The size of the result list is equal to the size of the input list.
-     * Buffers are written in order, and the list of result futures has the same order.
-     *
-     * @param record set of log records
-     * @return A Future which contains a list of Future DLSNs if the record was successfully written
-     * or an exception if the operation fails.
-     */
-    public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
-
-    /**
-     * Truncate the log until <i>dlsn</i>.
-     *
-     * @param dlsn
-     *          dlsn to truncate until.
-     * @return A Future indicates whether the operation succeeds or not, or an exception
-     * if the truncation fails.
-     */
-    public Future<Boolean> truncate(DLSN dlsn);
-
-    /**
-     * Get the name of the stream this writer writes data to
-     */
-    public String getStreamName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
deleted file mode 100644
index bd71147..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-public interface AsyncNotification {
-    /**
-     * Triggered when the background activity encounters an exception
-     *
-     * @param reason the exception that encountered.
-     */
-    void notifyOnError(Throwable reason);
-
-    /**
-     *  Triggered when the background activity completes an operation
-     */
-    void notifyOnOperationComplete();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
deleted file mode 100644
index d1c28d7..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.io.Abortable;
-import com.twitter.distributedlog.io.Abortables;
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.PermitManager;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable {
-    static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class);
-
-    protected final DistributedLogConfiguration conf;
-    private final DynamicDistributedLogConfiguration dynConf;
-    protected final BKDistributedLogManager bkDistributedLogManager;
-
-    // States
-    private Promise<Void> closePromise = null;
-    private volatile boolean forceRolling = false;
-    private boolean forceRecovery = false;
-
-    // Truncation Related
-    private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;
-    @VisibleForTesting
-    private Long minTimestampToKeepOverride = null;
-
-    // Log Segment Writers
-    protected BKLogSegmentWriter segmentWriter = null;
-    protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
-    protected BKLogSegmentWriter allocatedSegmentWriter = null;
-    protected BKLogWriteHandler writeHandler = null;
-
-    BKAbstractLogWriter(DistributedLogConfiguration conf,
-                        DynamicDistributedLogConfiguration dynConf,
-                        BKDistributedLogManager bkdlm) {
-        this.conf = conf;
-        this.dynConf = dynConf;
-        this.bkDistributedLogManager = bkdlm;
-        LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(),
-                TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS));
-    }
-
-    // manage write handler
-
-    synchronized protected BKLogWriteHandler getCachedWriteHandler() {
-        return writeHandler;
-    }
-
-    protected BKLogWriteHandler getWriteHandler() throws IOException {
-        BKLogWriteHandler writeHandler = createAndCacheWriteHandler();
-        writeHandler.checkMetadataException();
-        return writeHandler;
-    }
-
-    protected BKLogWriteHandler createAndCacheWriteHandler()
-            throws IOException {
-        synchronized (this) {
-            if (writeHandler != null) {
-                return writeHandler;
-            }
-        }
-        // This code path will be executed when the handler is not set or has been closed
-        // due to forceRecovery during testing
-        BKLogWriteHandler newHandler =
-                FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false));
-        boolean success = false;
-        try {
-            synchronized (this) {
-                if (writeHandler == null) {
-                    writeHandler = newHandler;
-                    success = true;
-                }
-                return writeHandler;
-            }
-        } finally {
-            if (!success) {
-                newHandler.asyncAbort();
-            }
-        }
-    }
-
-    // manage log segment writers
-
-    protected synchronized BKLogSegmentWriter getCachedLogWriter() {
-        return segmentWriter;
-    }
-
-    protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
-        return segmentWriterFuture;
-    }
-
-    protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) {
-        this.segmentWriter = logWriter;
-        this.segmentWriterFuture = Future.value(logWriter);
-    }
-
-    protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
-        try {
-            return segmentWriter;
-        } finally {
-            segmentWriter = null;
-            segmentWriterFuture = null;
-        }
-    }
-
-    protected synchronized BKLogSegmentWriter getAllocatedLogWriter() {
-        return allocatedSegmentWriter;
-    }
-
-    protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) {
-        this.allocatedSegmentWriter = logWriter;
-    }
-
-    protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() {
-        try {
-            return allocatedSegmentWriter;
-        } finally {
-            allocatedSegmentWriter = null;
-        }
-    }
-
-    private Future<Void> asyncCloseAndComplete(boolean shouldThrow) {
-        BKLogSegmentWriter segmentWriter = getCachedLogWriter();
-        BKLogWriteHandler writeHandler = getCachedWriteHandler();
-        if (null != segmentWriter && null != writeHandler) {
-            cancelTruncation();
-            Promise<Void> completePromise = new Promise<Void>();
-            asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow);
-            return completePromise;
-        } else {
-            return closeNoThrow();
-        }
-    }
-
-    private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter,
-                                       final BKLogWriteHandler writeHandler,
-                                       final Promise<Void> completePromise,
-                                       final boolean shouldThrow) {
-        writeHandler.completeAndCloseLogSegment(segmentWriter)
-                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
-                    @Override
-                    public void onSuccess(LogSegmentMetadata segment) {
-                        removeCachedLogWriter();
-                        complete(null);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        LOG.error("Completing Log segments encountered exception", cause);
-                        complete(cause);
-                    }
-
-                    private void complete(final Throwable cause) {
-                        closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
-                            @Override
-                            public BoxedUnit apply() {
-                                if (null != cause && shouldThrow) {
-                                    FutureUtils.setException(completePromise, cause);
-                                } else {
-                                    FutureUtils.setValue(completePromise, null);
-                                }
-                                return BoxedUnit.UNIT;
-                            }
-                        });
-                    }
-                });
-    }
-
-    @VisibleForTesting
-    void closeAndComplete() throws IOException {
-        FutureUtils.result(asyncCloseAndComplete(true));
-    }
-
-    protected Future<Void> asyncCloseAndComplete() {
-        return asyncCloseAndComplete(true);
-    }
-
-    @Override
-    public void close() throws IOException {
-        FutureUtils.result(asyncClose());
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        return asyncCloseAndComplete(false);
-    }
-
-    /**
-     * Close the writer and release all the underlying resources
-     */
-    protected Future<Void> closeNoThrow() {
-        Promise<Void> closeFuture;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        cancelTruncation();
-        Utils.closeSequence(bkDistributedLogManager.getScheduler(),
-                true, /** ignore close errors **/
-                getCachedLogWriter(),
-                getAllocatedLogWriter(),
-                getCachedWriteHandler()
-        ).proxyTo(closeFuture);
-        return closeFuture;
-    }
-
-    @Override
-    public void abort() throws IOException {
-        FutureUtils.result(asyncAbort());
-    }
-
-    @Override
-    public Future<Void> asyncAbort() {
-        Promise<Void> closeFuture;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        cancelTruncation();
-        Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
-                getCachedLogWriter(),
-                getAllocatedLogWriter(),
-                getCachedWriteHandler()).proxyTo(closeFuture);
-        return closeFuture;
-    }
-
-    // used by sync writer
-    protected BKLogSegmentWriter getLedgerWriter(final long startTxId,
-                                                 final boolean allowMaxTxID)
-            throws IOException {
-        Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
-        BKLogSegmentWriter logSegmentWriter = null;
-        if (null != logSegmentWriterFuture) {
-            logSegmentWriter = FutureUtils.result(logSegmentWriterFuture);
-        }
-        if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) {
-            logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary(
-                    logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID));
-        }
-        return logSegmentWriter;
-    }
-
-    // used by async writer
-    synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
-        final BKLogSegmentWriter ledgerWriter = getCachedLogWriter();
-        Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
-        if (null == ledgerWriterFuture || null == ledgerWriter) {
-            return null;
-        }
-
-        // Handle the case where the last call to write actually caused an error in the log
-        if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) {
-            // Close the ledger writer so that we will recover and start a new log segment
-            Future<Void> closeFuture;
-            if (ledgerWriter.isLogSegmentInError()) {
-                closeFuture = ledgerWriter.asyncAbort();
-            } else {
-                closeFuture = ledgerWriter.asyncClose();
-            }
-            return closeFuture.flatMap(
-                    new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() {
-                @Override
-                public Future<BKLogSegmentWriter> apply(Void result) {
-                    removeCachedLogWriter();
-
-                    if (ledgerWriter.isLogSegmentInError()) {
-                        return Future.value(null);
-                    }
-
-                    BKLogWriteHandler writeHandler;
-                    try {
-                        writeHandler = getWriteHandler();
-                    } catch (IOException e) {
-                        return Future.exception(e);
-                    }
-                    if (null != writeHandler && forceRecovery) {
-                        return writeHandler.completeAndCloseLogSegment(ledgerWriter)
-                                .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() {
-                            @Override
-                            public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) {
-                                return null;
-                            }
-                        });
-                    } else {
-                        return Future.value(null);
-                    }
-                }
-            });
-        } else {
-            return ledgerWriterFuture;
-        }
-    }
-
-    boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException {
-        BKLogWriteHandler writeHandler = getWriteHandler();
-        return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling;
-    }
-
-    private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) {
-        boolean truncationEnabled = false;
-
-        long minTimestampToKeep = 0;
-
-        long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS);
-        if (retentionPeriodInMillis > 0) {
-            minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis;
-            truncationEnabled = true;
-        }
-
-        if (null != minTimestampToKeepOverride) {
-            minTimestampToKeep = minTimestampToKeepOverride;
-            truncationEnabled = true;
-        }
-
-        // skip scheduling if there is task that's already running
-        //
-        synchronized (this) {
-            if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
-                lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
-            }
-        }
-    }
-
-    private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
-                                                               final long startTxId,
-                                                               final boolean allowMaxTxID) {
-        return writeHandler.recoverIncompleteLogSegments()
-                .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() {
-            @Override
-            public Future<BKLogSegmentWriter> apply(Long lastTxId) {
-                return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
-                        .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) {
-                        cacheLogWriter(newSegmentWriter);
-                        return BoxedUnit.UNIT;
-                    }
-                });
-            }
-        });
-    }
-
-    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
-            final BKLogSegmentWriter oldSegmentWriter,
-            final BKLogWriteHandler writeHandler,
-            final long startTxId,
-            final boolean bestEffort,
-            final boolean allowMaxTxID) {
-        final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
-        if (switchPermit.isAllowed()) {
-            return closeOldLogSegmentAndStartNewOne(
-                    oldSegmentWriter,
-                    writeHandler,
-                    startTxId,
-                    bestEffort,
-                    allowMaxTxID
-            ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() {
-                @Override
-                public Future<BKLogSegmentWriter> apply(Throwable cause) {
-                    if (cause instanceof LockingException) {
-                        LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
-                                writeHandler.getFullyQualifiedName(), cause);
-                        bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
-                        return Future.value(oldSegmentWriter);
-                    } else if (cause instanceof ZKException) {
-                        ZKException zke = (ZKException) cause;
-                        if (ZKException.isRetryableZKException(zke)) {
-                            LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
-                                    " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
-                                    zke.getKeeperExceptionCode());
-                            bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
-                            return Future.value(oldSegmentWriter);
-                        }
-                    }
-                    return Future.exception(cause);
-                }
-            }).ensure(new AbstractFunction0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    bkDistributedLogManager.getLogSegmentRollingPermitManager()
-                            .releasePermit(switchPermit);
-                    return BoxedUnit.UNIT;
-                }
-            });
-        } else {
-            bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit);
-            return Future.value(oldSegmentWriter);
-        }
-    }
-
-    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
-            final BKLogSegmentWriter oldSegmentWriter,
-            final BKLogWriteHandler writeHandler,
-            final long startTxId,
-            final boolean bestEffort,
-            final boolean allowMaxTxID) {
-        // we switch only when we could allocate a new log segment.
-        BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter();
-        if (null == newSegmentWriter) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Allocating a new log segment from {} for {}.", startTxId,
-                        writeHandler.getFullyQualifiedName());
-            }
-            return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID)
-                    .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
-                        @Override
-                        public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
-                            if (null == newSegmentWriter) {
-                                if (bestEffort) {
-                                    return Future.value(oldSegmentWriter);
-                                } else {
-                                    return Future.exception(
-                                            new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
-                                }
-                            }
-                            cacheAllocatedLogWriter(newSegmentWriter);
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Allocated a new log segment from {} for {}.", startTxId,
-                                        writeHandler.getFullyQualifiedName());
-                            }
-                            return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
-                        }
-                    });
-        } else {
-            return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
-        }
-    }
-
-    private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
-            BKLogSegmentWriter oldSegmentWriter,
-            final BKLogSegmentWriter newSegmentWriter) {
-        final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>();
-        // complete the old log segment
-        writeHandler.completeAndCloseLogSegment(oldSegmentWriter)
-                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
-
-                    @Override
-                    public void onSuccess(LogSegmentMetadata value) {
-                        cacheLogWriter(newSegmentWriter);
-                        removeAllocatedLogWriter();
-                        FutureUtils.setValue(completePromise, newSegmentWriter);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        FutureUtils.setException(completePromise, cause);
-                    }
-                });
-        return completePromise;
-    }
-
-    synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(
-            final BKLogSegmentWriter segmentWriter,
-            long startTxId,
-            boolean bestEffort,
-            boolean allowMaxTxID) {
-        final BKLogWriteHandler writeHandler;
-        try {
-            writeHandler = getWriteHandler();
-        } catch (IOException e) {
-            return Future.exception(e);
-        }
-        Future<BKLogSegmentWriter> rollPromise;
-        if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) {
-            rollPromise = closeOldLogSegmentAndStartNewOneWithPermit(
-                    segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID);
-        } else if (null == segmentWriter) {
-            rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID);
-        } else {
-            rollPromise = Future.value(segmentWriter);
-        }
-        return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() {
-            @Override
-            public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) {
-                if (segmentWriter == newSegmentWriter) {
-                    return newSegmentWriter;
-                }
-                truncateLogSegmentsIfNecessary(writeHandler);
-                return newSegmentWriter;
-            }
-        });
-    }
-
-    protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
-        if (null != closePromise) {
-            LOG.error("Executing " + operation + " on already closed Log Writer");
-            throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer");
-        }
-    }
-
-    @VisibleForTesting
-    public void setForceRolling(boolean forceRolling) {
-        this.forceRolling = forceRolling;
-    }
-
-    @VisibleForTesting
-    public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) {
-        this.minTimestampToKeepOverride = minTimestampToKeepOverride;
-    }
-
-    protected synchronized void cancelTruncation() {
-        if (null != lastTruncationAttempt) {
-            FutureUtils.cancel(lastTruncationAttempt);
-            lastTruncationAttempt = null;
-        }
-    }
-
-    @VisibleForTesting
-    public synchronized void setForceRecovery(boolean forceRecovery) {
-        this.forceRecovery = forceRecovery;
-    }
-
-}