You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:51 UTC

[19/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java
new file mode 100644
index 0000000..bed516d
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DistributedLogManagerImpl}.
+ */
+public class TestDistributedLogManagerImpl {
+
+    private final org.apache.distributedlog.api.DistributedLogManager impl =
+        mock(org.apache.distributedlog.api.DistributedLogManager.class);
+    private final DistributedLogManagerImpl manager = new DistributedLogManagerImpl(impl);
+
+    @Test
+    public void testGetStreamName() throws Exception {
+        String name = "test-get-stream-name";
+        when(impl.getStreamName()).thenReturn(name);
+        assertEquals(name, manager.getStreamName());
+        verify(impl, times(1)).getStreamName();
+    }
+
+    @Test
+    public void testGetNamespaceDriver() throws Exception {
+        NamespaceDriver driver = mock(NamespaceDriver.class);
+        when(impl.getNamespaceDriver()).thenReturn(driver);
+        assertEquals(driver, manager.getNamespaceDriver());
+        verify(impl, times(1)).getNamespaceDriver();
+    }
+
+    @Test
+    public void testGetLogSegments() throws Exception {
+        List<LogSegmentMetadata> segments = mock(List.class);
+        when(impl.getLogSegments()).thenReturn(segments);
+        assertEquals(segments, manager.getLogSegments());
+        verify(impl, times(1)).getLogSegments();
+    }
+
+    @Test
+    public void testRegisterListener() throws Exception {
+        LogSegmentListener listener = mock(LogSegmentListener.class);
+        manager.registerListener(listener);
+        verify(impl, times(1)).registerListener(listener);
+    }
+
+    @Test
+    public void testUnregisterListener() throws Exception {
+        LogSegmentListener listener = mock(LogSegmentListener.class);
+        manager.unregisterListener(listener);
+        verify(impl, times(1)).unregisterListener(listener);
+    }
+
+    @Test
+    public void testOpenAsyncLogWriter() throws Exception {
+        AsyncLogWriter writer = mock(AsyncLogWriter.class);
+        when(impl.openAsyncLogWriter()).thenReturn(CompletableFuture.completedFuture(writer));
+        assertEquals(writer, ((AsyncLogWriterImpl) FutureUtils.result(manager.openAsyncLogWriter())).getImpl());
+        verify(impl, times(1)).openAsyncLogWriter();
+    }
+
+    @Test
+    public void testStartLogSegmentNonPartitioned() throws Exception {
+        LogWriter writer = mock(LogWriter.class);
+        when(impl.startLogSegmentNonPartitioned()).thenReturn(writer);
+        assertEquals(writer, ((LogWriterImpl) manager.startLogSegmentNonPartitioned()).getImpl());
+        verify(impl, times(1)).startLogSegmentNonPartitioned();
+    }
+
+    @Test
+    public void testStartAsyncLogSegmentNonPartitioned() throws Exception {
+        AsyncLogWriter writer = mock(AsyncLogWriter.class);
+        when(impl.startAsyncLogSegmentNonPartitioned()).thenReturn(writer);
+        assertEquals(writer, ((AsyncLogWriterImpl) manager.startAsyncLogSegmentNonPartitioned()).getImpl());
+        verify(impl, times(1)).startAsyncLogSegmentNonPartitioned();
+    }
+
+    @Test
+    public void testGetAppendOnlyStreamWriter() throws Exception {
+        AppendOnlyStreamWriter writer = mock(AppendOnlyStreamWriter.class);
+        when(impl.getAppendOnlyStreamWriter()).thenReturn(writer);
+        assertEquals(writer, manager.getAppendOnlyStreamWriter());
+        verify(impl, times(1)).getAppendOnlyStreamWriter();
+    }
+
+    @Test
+    public void testGetAppendOnlyStreamReader() throws Exception {
+        AppendOnlyStreamReader writer = mock(AppendOnlyStreamReader.class);
+        when(impl.getAppendOnlyStreamReader()).thenReturn(writer);
+        assertEquals(writer, manager.getAppendOnlyStreamReader());
+        verify(impl, times(1)).getAppendOnlyStreamReader();
+    }
+
+    @Test
+    public void testGetInputStream() throws Exception {
+        LogReader reader = mock(LogReader.class);
+        when(impl.getInputStream(anyLong())).thenReturn(reader);
+        assertEquals(reader, ((LogReaderImpl) manager.getInputStream(1234L)).getImpl());
+        verify(impl, times(1)).getInputStream(eq(1234L));
+    }
+
+    @Test
+    public void testGetInputStream2() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        LogReader reader = mock(LogReader.class);
+        when(impl.getInputStream(eq(dlsn))).thenReturn(reader);
+        assertEquals(reader, ((LogReaderImpl) manager.getInputStream(dlsn)).getImpl());
+        verify(impl, times(1)).getInputStream(eq(dlsn));
+    }
+
+    @Test
+    public void testOpenAsyncLogReader() throws Exception {
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.openAsyncLogReader(eq(1234L))).thenReturn(CompletableFuture.completedFuture(reader));
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) FutureUtils.result(manager.openAsyncLogReader(1234L))).getImpl());
+        verify(impl, times(1)).openAsyncLogReader(eq(1234L));
+    }
+
+    @Test
+    public void testOpenAsyncLogReader2() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.openAsyncLogReader(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(reader));
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) FutureUtils.result(manager.openAsyncLogReader(dlsn))).getImpl());
+        verify(impl, times(1)).openAsyncLogReader(eq(dlsn));
+    }
+
+    @Test
+    public void testGetAsyncLogReader() throws Exception {
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.getAsyncLogReader(eq(1234L))).thenReturn(reader);
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) manager.getAsyncLogReader(1234L)).getImpl());
+        verify(impl, times(1)).getAsyncLogReader(eq(1234L));
+    }
+
+    @Test
+    public void testGetAsyncLogReader2() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.getAsyncLogReader(eq(dlsn))).thenReturn(reader);
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) manager.getAsyncLogReader(dlsn)).getImpl());
+        verify(impl, times(1)).getAsyncLogReader(eq(dlsn));
+    }
+
+    @Test
+    public void testOpenAsyncLogReaderWithLock() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.getAsyncLogReaderWithLock(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(reader));
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(dlsn))).getImpl());
+        verify(impl, times(1)).getAsyncLogReaderWithLock(eq(dlsn));
+    }
+
+    @Test
+    public void testOpenAsyncLogReaderWithLock2() throws Exception {
+        String subscriberId = "test-subscriber";
+        DLSN dlsn = mock(DLSN.class);
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.getAsyncLogReaderWithLock(eq(dlsn), eq(subscriberId)))
+            .thenReturn(CompletableFuture.completedFuture(reader));
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(dlsn, subscriberId))).getImpl());
+        verify(impl, times(1)).getAsyncLogReaderWithLock(eq(dlsn), eq(subscriberId));
+    }
+
+    @Test
+    public void testOpenAsyncLogReaderWithLock3() throws Exception {
+        String subscriberId = "test-subscriber";
+        org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+        when(impl.getAsyncLogReaderWithLock(eq(subscriberId)))
+            .thenReturn(CompletableFuture.completedFuture(reader));
+        assertEquals(reader,
+            ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(subscriberId))).getImpl());
+        verify(impl, times(1)).getAsyncLogReaderWithLock(eq(subscriberId));
+    }
+
+    @Test
+    public void testGetDLSNNotLessThanTxId() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        when(impl.getDLSNNotLessThanTxId(anyLong())).thenReturn(CompletableFuture.completedFuture(dlsn));
+        assertEquals(dlsn, FutureUtils.result(manager.getDLSNNotLessThanTxId(1234L)));
+        verify(impl, times(1)).getDLSNNotLessThanTxId(eq(1234L));
+    }
+
+    @Test
+    public void testGetLastLogRecord() throws Exception {
+        LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+        when(impl.getLastLogRecord()).thenReturn(record);
+        assertEquals(record, manager.getLastLogRecord());
+        verify(impl, times(1)).getLastLogRecord();
+    }
+
+    @Test
+    public void testFirstTxId() throws Exception {
+        long txId = System.currentTimeMillis();
+        when(impl.getFirstTxId()).thenReturn(txId);
+        assertEquals(txId, manager.getFirstTxId());
+        verify(impl, times(1)).getFirstTxId();
+    }
+
+    @Test
+    public void testLastTxId() throws Exception {
+        long txId = System.currentTimeMillis();
+        when(impl.getLastTxId()).thenReturn(txId);
+        assertEquals(txId, manager.getLastTxId());
+        verify(impl, times(1)).getLastTxId();
+    }
+
+    @Test
+    public void testLastDLSN() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        when(impl.getLastDLSN()).thenReturn(dlsn);
+        assertEquals(dlsn, manager.getLastDLSN());
+        verify(impl, times(1)).getLastDLSN();
+    }
+
+    @Test
+    public void testGetLastLogRecordAsync() throws Exception {
+        LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+        when(impl.getLastLogRecordAsync()).thenReturn(CompletableFuture.completedFuture(record));
+        assertEquals(record, FutureUtils.result(manager.getLastLogRecordAsync()));
+        verify(impl, times(1)).getLastLogRecordAsync();
+    }
+
+    @Test
+    public void testLastTxIdAsync() throws Exception {
+        long txId = System.currentTimeMillis();
+        when(impl.getLastTxIdAsync()).thenReturn(CompletableFuture.completedFuture(txId));
+        assertEquals(txId, FutureUtils.result(manager.getLastTxIdAsync()).longValue());
+        verify(impl, times(1)).getLastTxIdAsync();
+    }
+
+    @Test
+    public void testLastDLSNAsync() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        when(impl.getLastDLSNAsync()).thenReturn(CompletableFuture.completedFuture(dlsn));
+        assertEquals(dlsn, FutureUtils.result(manager.getLastDLSNAsync()));
+        verify(impl, times(1)).getLastDLSNAsync();
+    }
+
+    @Test
+    public void testFirstDLSNAsync() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        when(impl.getFirstDLSNAsync()).thenReturn(CompletableFuture.completedFuture(dlsn));
+        assertEquals(dlsn, FutureUtils.result(manager.getFirstDLSNAsync()));
+        verify(impl, times(1)).getFirstDLSNAsync();
+    }
+
+    @Test
+    public void testGetLogRecordCount() throws Exception {
+        long count = System.currentTimeMillis();
+        when(impl.getLogRecordCount()).thenReturn(count);
+        assertEquals(count, manager.getLogRecordCount());
+        verify(impl, times(1)).getLogRecordCount();
+    }
+
+    @Test
+    public void testGetLogRecordCountAsync() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        long count = System.currentTimeMillis();
+        when(impl.getLogRecordCountAsync(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(count));
+        assertEquals(count, FutureUtils.result(manager.getLogRecordCountAsync(dlsn)).longValue());
+        verify(impl, times(1)).getLogRecordCountAsync(eq(dlsn));
+    }
+
+    @Test
+    public void testRecover() throws Exception {
+        manager.recover();
+        verify(impl, times(1)).recover();
+    }
+
+    @Test
+    public void testIsEndOfStreamMarked() throws Exception {
+        when(impl.isEndOfStreamMarked()).thenReturn(true);
+        assertTrue(manager.isEndOfStreamMarked());
+        verify(impl, times(1)).isEndOfStreamMarked();
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        manager.delete();
+        verify(impl, times(1)).delete();
+    }
+
+    @Test
+    public void testPurgeLogsOlderThan() throws Exception {
+        long minTxIdToKeep = System.currentTimeMillis();
+        manager.purgeLogsOlderThan(minTxIdToKeep);
+        verify(impl, times(1)).purgeLogsOlderThan(eq(minTxIdToKeep));
+    }
+
+    @Test
+    public void testGetSubscriptionsStore() throws Exception {
+        SubscriptionsStore ss = mock(SubscriptionsStore.class);
+        when(impl.getSubscriptionsStore()).thenReturn(ss);
+        assertEquals(ss, ((SubscriptionsStoreImpl) manager.getSubscriptionsStore()).getImpl());
+        verify(impl, times(1)).getSubscriptionsStore();
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        manager.close();
+        verify(impl, times(1)).close();
+    }
+
+    @Test
+    public void testAsyncClose() throws Exception {
+        when(impl.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+        FutureUtils.result(manager.asyncClose());
+        verify(impl, times(1)).asyncClose();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java
new file mode 100644
index 0000000..4adc386
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LogReaderImpl}.
+ */
+public class TestLogReaderImpl {
+
+    private final org.apache.distributedlog.api.LogReader underlying =
+        mock(org.apache.distributedlog.api.LogReader.class);
+    private final LogReaderImpl reader = new LogReaderImpl(underlying);
+
+    @Test
+    public void testReadNext() throws Exception {
+        reader.readNext(false);
+        verify(underlying, times(1)).readNext(eq(false));
+    }
+
+    @Test
+    public void testReadBulk() throws Exception {
+        reader.readBulk(false, 100);
+        verify(underlying, times(1)).readBulk(eq(false), eq(100));
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        reader.close();
+        verify(underlying, times(1)).close();
+    }
+
+    @Test
+    public void testAsyncClose() throws Exception {
+        when(underlying.asyncClose())
+            .thenReturn(CompletableFuture.completedFuture(null));
+        FutureUtils.result(reader.asyncClose());
+        verify(underlying, times(1)).asyncClose();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java
new file mode 100644
index 0000000..be69260
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LogWriterImpl}.
+ */
+public class TestLogWriterImpl {
+
+    private final org.apache.distributedlog.api.LogWriter underlying =
+        mock(org.apache.distributedlog.api.LogWriter.class);
+    private final LogWriterImpl writer = new LogWriterImpl(underlying);
+
+    @Test
+    public void testWrite() throws Exception {
+        LogRecord record = mock(LogRecord.class);
+        writer.write(record);
+        verify(underlying, times(1)).write(eq(record));
+    }
+
+    @Test
+    public void testWriteBulk() throws Exception {
+        List<LogRecord> records = mock(List.class);
+        writer.writeBulk(records);
+        verify(underlying, times(1)).writeBulk(eq(records));
+    }
+
+    @Test
+    public void testSetReadyToFlush() throws Exception {
+        writer.setReadyToFlush();
+        verify(underlying, times(1)).setReadyToFlush();
+    }
+
+    @Test
+    public void testFlushAndSync() throws Exception {
+        writer.flushAndSync();
+        verify(underlying, times(1)).flushAndSync();
+    }
+
+    @Test
+    public void testMarkEndOfStream() throws Exception {
+        writer.markEndOfStream();
+        verify(underlying, times(1)).markEndOfStream();
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        writer.close();
+        verify(underlying, times(1)).close();
+    }
+
+    @Test
+    public void testAbort() throws Exception {
+        writer.abort();
+        verify(underlying, times(1)).abort();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java
new file mode 100644
index 0000000..e6573aa
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link SubscriptionsStoreImpl}.
+ */
+public class TestSubscriptionStoreImpl {
+
+    private final SubscriptionsStore underlying = mock(SubscriptionsStore.class);
+    private final SubscriptionsStoreImpl store = new SubscriptionsStoreImpl(underlying);
+
+    @Test
+    public void testGetLastCommitPosition() throws Exception {
+        String subscriber = "test-subscriber";
+        DLSN dlsn = mock(DLSN.class);
+        when(underlying.getLastCommitPosition(anyString()))
+            .thenReturn(CompletableFuture.completedFuture(dlsn));
+        assertEquals(dlsn,
+            FutureUtils.result(store.getLastCommitPosition(subscriber)));
+        verify(underlying, times(1)).getLastCommitPosition(eq(subscriber));
+    }
+
+    @Test
+    public void testGetLastCommitPositions() throws Exception {
+        Map<String, DLSN> positions = mock(Map.class);
+        when(underlying.getLastCommitPositions())
+            .thenReturn(CompletableFuture.completedFuture(positions));
+        assertEquals(positions, FutureUtils.result(store.getLastCommitPositions()));
+        verify(underlying, times(1)).getLastCommitPositions();
+    }
+
+    @Test
+    public void testAdvanceCommmitPosition() throws Exception {
+        String subscriber = "test-subscriber";
+        DLSN dlsn = mock(DLSN.class);
+        when(underlying.advanceCommitPosition(anyString(), any(DLSN.class)))
+            .thenReturn(CompletableFuture.completedFuture(null));
+        FutureUtils.result(store.advanceCommitPosition(subscriber, dlsn));
+        verify(underlying, times(1))
+            .advanceCommitPosition(eq(subscriber), eq(dlsn));
+    }
+
+    @Test
+    public void testDeleteSubscriber() throws Exception {
+        String subscriber = "test-subscriber";
+        when(underlying.deleteSubscriber(anyString()))
+            .thenReturn(CompletableFuture.completedFuture(true));
+        assertTrue(FutureUtils.result(store.deleteSubscriber(subscriber)));
+        verify(underlying, times(1)).deleteSubscriber(eq(subscriber));
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        store.close();
+        verify(underlying, times(1)).close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
new file mode 100644
index 0000000..78dcb2a
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.namespace;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.net.URI;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DistributedLogNamespaceBuilder}.
+ */
+public class TestDistributedLogNamespaceBuilder {
+
+    private final NamespaceBuilder underlying = mock(NamespaceBuilder.class);
+    private final DistributedLogNamespaceBuilder builder = new DistributedLogNamespaceBuilder(underlying);
+
+    @Test
+    public void testConf() {
+        DistributedLogConfiguration conf = mock(DistributedLogConfiguration.class);
+        builder.conf(conf);
+        verify(underlying, times(1)).conf(eq(conf));
+    }
+
+    @Test
+    public void testDynConf() {
+        DynamicDistributedLogConfiguration conf = mock(DynamicDistributedLogConfiguration.class);
+        builder.dynConf(conf);
+        verify(underlying, times(1)).dynConf(eq(conf));
+    }
+
+    @Test
+    public void testUri() {
+        URI uri = URI.create("distributedlog://127.0.0.1/messaging/distributedlog");
+        builder.uri(uri);
+        verify(underlying, times(1)).uri(eq(uri));
+    }
+
+    @Test
+    public void testStatsLogger() {
+        StatsLogger statsLogger = mock(StatsLogger.class);
+        builder.statsLogger(statsLogger);
+        verify(underlying, times(1)).statsLogger(eq(statsLogger));
+    }
+
+    @Test
+    public void testPerLogStatsLogger() {
+        StatsLogger statsLogger = mock(StatsLogger.class);
+        builder.perLogStatsLogger(statsLogger);
+        verify(underlying, times(1)).perLogStatsLogger(eq(statsLogger));
+    }
+
+    @Test
+    public void testFeatureProvider() {
+        FeatureProvider provider = mock(FeatureProvider.class);
+        builder.featureProvider(provider);
+        verify(underlying, times(1)).featureProvider(eq(provider));
+    }
+
+    @Test
+    public void testClientId() {
+        String clientId = "test-client-id";
+        builder.clientId(clientId);
+        verify(underlying, times(1)).clientId(eq(clientId));
+    }
+
+    @Test
+    public void testRegionId() {
+        int regionId = 1234;
+        builder.regionId(regionId);
+        verify(underlying, times(1)).regionId(eq(regionId));
+    }
+
+    @Test
+    public void testBuild() throws Exception {
+        builder.build();
+        verify(underlying, times(1)).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java
new file mode 100644
index 0000000..b562fe4
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.namespace;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Optional;
+import java.util.Iterator;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DistributedLogNamespaceImpl}.
+ */
+public class TestDistributedLogNamespaceImpl {
+
+    private final Namespace impl = mock(Namespace.class);
+    private final DistributedLogNamespaceImpl namespace = new DistributedLogNamespaceImpl(impl);
+
+    @Test
+    public void testGetNamespaceDriver() {
+        NamespaceDriver driver = mock(NamespaceDriver.class);
+        when(impl.getNamespaceDriver()).thenReturn(driver);
+        assertEquals(driver, namespace.getNamespaceDriver());
+        verify(impl, times(1)).getNamespaceDriver();
+    }
+
+    @Test
+    public void testCreateLog() throws Exception {
+        String logName = "test-log-name";
+        namespace.createLog(logName);
+        verify(impl, times(1)).createLog(eq(logName));
+    }
+
+    @Test
+    public void testDeleteLog() throws Exception {
+        String logName = "test-log-name";
+        namespace.deleteLog(logName);
+        verify(impl, times(1)).deleteLog(eq(logName));
+    }
+
+    @Test
+    public void testOpenLog() throws Exception {
+        String logName = "test-open-log";
+        namespace.openLog(logName);
+        verify(impl, times(1)).openLog(eq(logName));
+    }
+
+    @Test
+    public void testOpenLog2() throws Exception {
+        String logName = "test-open-log";
+        namespace.openLog(logName, Optional.absent(), Optional.absent(), Optional.absent());
+        verify(impl, times(1))
+            .openLog(eq(logName), eq(Optional.absent()), eq(Optional.absent()), eq(Optional.absent()));
+    }
+
+    @Test
+    public void testLogExists() throws Exception {
+        String logName = "test-log-exists";
+        when(impl.logExists(anyString())).thenReturn(true);
+        assertTrue(namespace.logExists(logName));
+        verify(impl, times(1)).logExists(eq(logName));
+    }
+
+    @Test
+    public void testGetLogs() throws Exception {
+        Iterator<String> logs = mock(Iterator.class);
+        when(impl.getLogs()).thenReturn(logs);
+        assertEquals(logs, namespace.getLogs());
+        verify(impl, times(1)).getLogs();
+    }
+
+    @Test
+    public void testRegisterNamespaceListener() throws Exception {
+        NamespaceListener listener = mock(NamespaceListener.class);
+        namespace.registerNamespaceListener(listener);
+        verify(impl, times(1)).registerNamespaceListener(eq(listener));
+    }
+
+    @Test
+    public void testCreateAccessControlManager() throws Exception {
+        AccessControlManager manager = mock(AccessControlManager.class);
+        when(impl.createAccessControlManager()).thenReturn(manager);
+        assertEquals(manager, namespace.createAccessControlManager());
+        verify(impl, times(1)).createAccessControlManager();
+    }
+
+    @Test
+    public void testClose() {
+        namespace.close();
+        verify(impl, times(1)).close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-core/conf/log4j.properties b/distributedlog-core/conf/log4j.properties
index 930db8d..af1cf5f 100644
--- a/distributedlog-core/conf/log4j.properties
+++ b/distributedlog-core/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.org.apache.bookkeeper=INFO
 
 # redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
 log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
 log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
 
 log4j.appender.Executors=org.apache.log4j.RollingFileAppender

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml
index a4f7568..1ad51c2 100644
--- a/distributedlog-core/pom.xml
+++ b/distributedlog-core/pom.xml
@@ -26,6 +26,16 @@
   <name>Apache DistributedLog :: Core Library</name>
   <dependencies>
     <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-common</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-protocol</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>${zookeeper.version}</version>
@@ -41,50 +51,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.jmock</groupId>
-      <artifactId>jmock</artifactId>
-      <version>${jmock.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>${slf4j.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter.common</groupId>
-      <artifactId>stats-util</artifactId>
-      <version>${stats-util.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>util-core_2.11</artifactId>
-      <version>${finagle.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>${commons-lang3.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
       <version>0.9.3</version>
@@ -114,19 +80,28 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>commons-cli</groupId>
-      <artifactId>commons-cli</artifactId>
-      <version>${commons-cli.version}</version>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-protocol</artifactId>
-      <version>${project.parent.version}</version>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>net.jpountz.lz4</groupId>
-      <artifactId>lz4</artifactId>
-      <version>${lz4.version}</version>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock</artifactId>
+      <version>${jmock.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
@@ -136,7 +111,7 @@
     </dependency> 
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-protocol</artifactId>
+      <artifactId>distributedlog-common</artifactId>
       <version>${project.parent.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
@@ -172,7 +147,7 @@
           <properties>
             <property>
               <name>listener</name>
-              <value>org.apache.distributedlog.TimedOutTestsListener</value>
+              <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
             </property>
           </properties>
         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
index 1d96f0e..3a31907 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
@@ -18,10 +18,10 @@
 package org.apache.distributedlog;
 
 import com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.io.InputStream;
-
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
index 8278c68..dde78c2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
@@ -17,14 +17,12 @@
  */
 package org.apache.distributedlog;
 
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
 import java.io.Closeable;
 import java.io.IOException;
-
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,16 +42,16 @@ public class AppendOnlyStreamWriter implements Closeable {
         this.requestPos = pos;
     }
 
-    public Future<DLSN> write(byte[] data) {
+    public CompletableFuture<DLSN> write(byte[] data) {
         requestPos += data.length;
-        Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
-        return writeResult.addEventListener(new WriteCompleteListener(requestPos));
+        CompletableFuture<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
+        return writeResult.whenComplete(new WriteCompleteListener(requestPos));
     }
 
     public void force(boolean metadata) throws IOException {
         long pos = 0;
         try {
-            pos = Await.result(logWriter.flushAndCommit());
+            pos = FutureUtils.result(logWriter.flushAndCommit());
         } catch (IOException ioe) {
             throw ioe;
         } catch (Exception ex) {
@@ -78,7 +76,7 @@ public class AppendOnlyStreamWriter implements Closeable {
 
     public void markEndOfStream() throws IOException {
         try {
-            Await.result(logWriter.markEndOfStream());
+            FutureUtils.result(logWriter.markEndOfStream());
         } catch (IOException ioe) {
             throw ioe;
         } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
deleted file mode 100644
index e3ace05..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public interface AsyncLogReader extends AsyncCloseable {
-
-    /**
-     * Get stream name that the reader reads from.
-     *
-     * @return stream name.
-     */
-    public String getStreamName();
-
-    /**
-     * Read the next record from the log stream
-     *
-     * @return A promise that when satisfied will contain the Log Record with its DLSN.
-     */
-    public Future<LogRecordWithDLSN> readNext();
-
-    /**
-     * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
-     * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
-     * call.
-     *
-     * @param numEntries
-     *          num entries
-     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
-     */
-    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
-
-    /**
-     * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
-     * <p>
-     * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
-     * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
-     * wait until new entries are available.
-     *
-     * @param numEntries
-     *          max entries to return
-     * @param waitTime
-     *          maximum wait time if there are entries already for read
-     * @param timeUnit
-     *          wait time unit
-     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
-     */
-    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
deleted file mode 100644
index 53b393b..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncAbortable;
-import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.util.List;
-
-public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
-
-    /**
-     * Get the last committed transaction id.
-     *
-     * @return last committed transaction id.
-     */
-    public long getLastTxId();
-
-    /**
-     * Write a log record to the stream.
-     *
-     * @param record single log record
-     * @return A Future which contains a DLSN if the record was successfully written
-     * or an exception if the write fails
-     */
-    public Future<DLSN> write(LogRecord record);
-
-    /**
-     * Write log records to the stream in bulk. Each future in the list represents the result of
-     * one write operation. The size of the result list is equal to the size of the input list.
-     * Buffers are written in order, and the list of result futures has the same order.
-     *
-     * @param record set of log records
-     * @return A Future which contains a list of Future DLSNs if the record was successfully written
-     * or an exception if the operation fails.
-     */
-    public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
-
-    /**
-     * Truncate the log until <i>dlsn</i>.
-     *
-     * @param dlsn
-     *          dlsn to truncate until.
-     * @return A Future indicates whether the operation succeeds or not, or an exception
-     * if the truncation fails.
-     */
-    public Future<Boolean> truncate(DLSN dlsn);
-
-    /**
-     * Get the name of the stream this writer writes data to
-     */
-    public String getStreamName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
index c12bd10..367bb50 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog;
 
-public interface AsyncNotification {
+interface AsyncNotification {
     /**
      * Triggered when the background activity encounters an exception
      *

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
index 4a2ef30..8a0bffb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
@@ -18,6 +18,9 @@
 package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.LockingException;
@@ -27,18 +30,12 @@ import org.apache.distributedlog.io.Abortable;
 import org.apache.distributedlog.io.Abortables;
 import org.apache.distributedlog.io.AsyncAbortable;
 import org.apache.distributedlog.io.AsyncCloseable;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -53,18 +50,18 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
     protected final BKDistributedLogManager bkDistributedLogManager;
 
     // States
-    private Promise<Void> closePromise = null;
+    private CompletableFuture<Void> closePromise = null;
     private volatile boolean forceRolling = false;
     private boolean forceRecovery = false;
 
     // Truncation Related
-    private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;
+    private CompletableFuture<List<LogSegmentMetadata>> lastTruncationAttempt = null;
     @VisibleForTesting
     private Long minTimestampToKeepOverride = null;
 
     // Log Segment Writers
     protected BKLogSegmentWriter segmentWriter = null;
-    protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
+    protected CompletableFuture<BKLogSegmentWriter> segmentWriterFuture = null;
     protected BKLogSegmentWriter allocatedSegmentWriter = null;
     protected BKLogWriteHandler writeHandler = null;
 
@@ -100,7 +97,7 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         // This code path will be executed when the handler is not set or has been closed
         // due to forceRecovery during testing
         BKLogWriteHandler newHandler =
-                FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false));
+                Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false));
         boolean success = false;
         try {
             synchronized (this) {
@@ -123,13 +120,13 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         return segmentWriter;
     }
 
-    protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
+    protected synchronized CompletableFuture<BKLogSegmentWriter> getCachedLogWriterFuture() {
         return segmentWriterFuture;
     }
 
     protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) {
         this.segmentWriter = logWriter;
-        this.segmentWriterFuture = Future.value(logWriter);
+        this.segmentWriterFuture = FutureUtils.value(logWriter);
     }
 
     protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
@@ -157,12 +154,12 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         }
     }
 
-    private Future<Void> asyncCloseAndComplete(boolean shouldThrow) {
+    private CompletableFuture<Void> asyncCloseAndComplete(boolean shouldThrow) {
         BKLogSegmentWriter segmentWriter = getCachedLogWriter();
         BKLogWriteHandler writeHandler = getCachedWriteHandler();
         if (null != segmentWriter && null != writeHandler) {
             cancelTruncation();
-            Promise<Void> completePromise = new Promise<Void>();
+            CompletableFuture<Void> completePromise = new CompletableFuture<Void>();
             asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow);
             return completePromise;
         } else {
@@ -172,10 +169,10 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
 
     private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter,
                                        final BKLogWriteHandler writeHandler,
-                                       final Promise<Void> completePromise,
+                                       final CompletableFuture<Void> completePromise,
                                        final boolean shouldThrow) {
         writeHandler.completeAndCloseLogSegment(segmentWriter)
-                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+                .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
                     @Override
                     public void onSuccess(LogSegmentMetadata segment) {
                         removeCachedLogWriter();
@@ -189,15 +186,11 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
                     }
 
                     private void complete(final Throwable cause) {
-                        closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
-                            @Override
-                            public BoxedUnit apply() {
-                                if (null != cause && shouldThrow) {
-                                    FutureUtils.setException(completePromise, cause);
-                                } else {
-                                    FutureUtils.setValue(completePromise, null);
-                                }
-                                return BoxedUnit.UNIT;
+                        FutureUtils.ensure(closeNoThrow(), () -> {
+                            if (null != cause && shouldThrow) {
+                                FutureUtils.completeExceptionally(completePromise, cause);
+                            } else {
+                                FutureUtils.complete(completePromise, null);
                             }
                         });
                     }
@@ -206,63 +199,67 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
 
     @VisibleForTesting
     void closeAndComplete() throws IOException {
-        FutureUtils.result(asyncCloseAndComplete(true));
+        Utils.ioResult(asyncCloseAndComplete(true));
     }
 
-    protected Future<Void> asyncCloseAndComplete() {
+    protected CompletableFuture<Void> asyncCloseAndComplete() {
         return asyncCloseAndComplete(true);
     }
 
     @Override
     public void close() throws IOException {
-        FutureUtils.result(asyncClose());
+        Utils.ioResult(asyncClose());
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         return asyncCloseAndComplete(false);
     }
 
     /**
      * Close the writer and release all the underlying resources
      */
-    protected Future<Void> closeNoThrow() {
-        Promise<Void> closeFuture;
+    protected CompletableFuture<Void> closeNoThrow() {
+        CompletableFuture<Void> closeFuture;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
         }
         cancelTruncation();
-        Utils.closeSequence(bkDistributedLogManager.getScheduler(),
-                true, /** ignore close errors **/
-                getCachedLogWriter(),
-                getAllocatedLogWriter(),
-                getCachedWriteHandler()
-        ).proxyTo(closeFuture);
+        FutureUtils.proxyTo(
+            Utils.closeSequence(bkDistributedLogManager.getScheduler(),
+                    true, /** ignore close errors **/
+                    getCachedLogWriter(),
+                    getAllocatedLogWriter(),
+                    getCachedWriteHandler()
+            ),
+            closeFuture);
         return closeFuture;
     }
 
     @Override
     public void abort() throws IOException {
-        FutureUtils.result(asyncAbort());
+        Utils.ioResult(asyncAbort());
     }
 
     @Override
-    public Future<Void> asyncAbort() {
-        Promise<Void> closeFuture;
+    public CompletableFuture<Void> asyncAbort() {
+        CompletableFuture<Void> closeFuture;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
         }
         cancelTruncation();
-        Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
-                getCachedLogWriter(),
-                getAllocatedLogWriter(),
-                getCachedWriteHandler()).proxyTo(closeFuture);
+        FutureUtils.proxyTo(
+            Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
+                    getCachedLogWriter(),
+                    getAllocatedLogWriter(),
+                    getCachedWriteHandler()),
+            closeFuture);
         return closeFuture;
     }
 
@@ -270,22 +267,22 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
     protected BKLogSegmentWriter getLedgerWriter(final long startTxId,
                                                  final boolean allowMaxTxID)
             throws IOException {
-        Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
+        CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
         BKLogSegmentWriter logSegmentWriter = null;
         if (null != logSegmentWriterFuture) {
-            logSegmentWriter = FutureUtils.result(logSegmentWriterFuture);
+            logSegmentWriter = Utils.ioResult(logSegmentWriterFuture);
         }
         if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) {
-            logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary(
+            logSegmentWriter = Utils.ioResult(rollLogSegmentIfNecessary(
                     logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID));
         }
         return logSegmentWriter;
     }
 
     // used by async writer
-    synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
+    synchronized protected CompletableFuture<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
         final BKLogSegmentWriter ledgerWriter = getCachedLogWriter();
-        Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
+        CompletableFuture<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
         if (null == ledgerWriterFuture || null == ledgerWriter) {
             return null;
         }
@@ -293,38 +290,38 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         // Handle the case where the last call to write actually caused an error in the log
         if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) {
             // Close the ledger writer so that we will recover and start a new log segment
-            Future<Void> closeFuture;
+            CompletableFuture<Void> closeFuture;
             if (ledgerWriter.isLogSegmentInError()) {
                 closeFuture = ledgerWriter.asyncAbort();
             } else {
                 closeFuture = ledgerWriter.asyncClose();
             }
-            return closeFuture.flatMap(
-                    new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() {
+            return closeFuture.thenCompose(
+                    new Function<Void, CompletionStage<BKLogSegmentWriter>>() {
                 @Override
-                public Future<BKLogSegmentWriter> apply(Void result) {
+                public CompletableFuture<BKLogSegmentWriter> apply(Void result) {
                     removeCachedLogWriter();
 
                     if (ledgerWriter.isLogSegmentInError()) {
-                        return Future.value(null);
+                        return FutureUtils.value(null);
                     }
 
                     BKLogWriteHandler writeHandler;
                     try {
                         writeHandler = getWriteHandler();
                     } catch (IOException e) {
-                        return Future.exception(e);
+                        return FutureUtils.exception(e);
                     }
                     if (null != writeHandler && forceRecovery) {
                         return writeHandler.completeAndCloseLogSegment(ledgerWriter)
-                                .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() {
+                                .thenApply(new Function<LogSegmentMetadata, BKLogSegmentWriter>() {
                             @Override
                             public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) {
                                 return null;
                             }
                         });
                     } else {
-                        return Future.value(null);
+                        return FutureUtils.value(null);
                     }
                 }
             });
@@ -357,32 +354,25 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         // skip scheduling if there is task that's already running
         //
         synchronized (this) {
-            if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
+            if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDone())) {
                 lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
             }
         }
     }
 
-    private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
+    private CompletableFuture<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
                                                                final long startTxId,
                                                                final boolean allowMaxTxID) {
         return writeHandler.recoverIncompleteLogSegments()
-                .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() {
-            @Override
-            public Future<BKLogSegmentWriter> apply(Long lastTxId) {
-                return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
-                        .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) {
+            .thenCompose(
+                lastTxId -> writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
+                    .thenApply(newSegmentWriter -> {
                         cacheLogWriter(newSegmentWriter);
-                        return BoxedUnit.UNIT;
-                    }
-                });
-            }
-        });
+                        return newSegmentWriter;
+                    }));
     }
 
-    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
+    private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
             final BKLogSegmentWriter oldSegmentWriter,
             final BKLogWriteHandler writeHandler,
             final long startTxId,
@@ -390,47 +380,46 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
             final boolean allowMaxTxID) {
         final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
         if (switchPermit.isAllowed()) {
-            return closeOldLogSegmentAndStartNewOne(
-                    oldSegmentWriter,
-                    writeHandler,
-                    startTxId,
-                    bestEffort,
-                    allowMaxTxID
-            ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() {
-                @Override
-                public Future<BKLogSegmentWriter> apply(Throwable cause) {
-                    if (cause instanceof LockingException) {
-                        LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
-                                writeHandler.getFullyQualifiedName(), cause);
-                        bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
-                        return Future.value(oldSegmentWriter);
-                    } else if (cause instanceof ZKException) {
-                        ZKException zke = (ZKException) cause;
-                        if (ZKException.isRetryableZKException(zke)) {
-                            LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
-                                    " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
-                                    zke.getKeeperExceptionCode());
+            return FutureUtils.ensure(
+                FutureUtils.rescue(
+                     closeOldLogSegmentAndStartNewOne(
+                            oldSegmentWriter,
+                            writeHandler,
+                            startTxId,
+                            bestEffort,
+                            allowMaxTxID
+                    ),
+                    // rescue function
+                    cause -> {
+                        if (cause instanceof LockingException) {
+                            LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
+                                    writeHandler.getFullyQualifiedName(), cause);
                             bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
-                            return Future.value(oldSegmentWriter);
+                            return FutureUtils.value(oldSegmentWriter);
+                        } else if (cause instanceof ZKException) {
+                            ZKException zke = (ZKException) cause;
+                            if (ZKException.isRetryableZKException(zke)) {
+                                LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
+                                        " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
+                                        zke.getKeeperExceptionCode());
+                                bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
+                                return FutureUtils.value(oldSegmentWriter);
+                            }
                         }
+                        return FutureUtils.exception(cause);
                     }
-                    return Future.exception(cause);
-                }
-            }).ensure(new AbstractFunction0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    bkDistributedLogManager.getLogSegmentRollingPermitManager()
-                            .releasePermit(switchPermit);
-                    return BoxedUnit.UNIT;
-                }
-            });
+                ),
+                // ensure function
+                () -> bkDistributedLogManager.getLogSegmentRollingPermitManager()
+                                .releasePermit(switchPermit)
+            );
         } else {
             bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit);
-            return Future.value(oldSegmentWriter);
+            return FutureUtils.value(oldSegmentWriter);
         }
     }
 
-    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
+    private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
             final BKLogSegmentWriter oldSegmentWriter,
             final BKLogWriteHandler writeHandler,
             final long startTxId,
@@ -444,14 +433,14 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
                         writeHandler.getFullyQualifiedName());
             }
             return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID)
-                    .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
+                    .thenCompose(new Function<BKLogSegmentWriter, CompletableFuture<BKLogSegmentWriter>>() {
                         @Override
-                        public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
+                        public CompletableFuture<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
                             if (null == newSegmentWriter) {
                                 if (bestEffort) {
-                                    return Future.value(oldSegmentWriter);
+                                    return FutureUtils.value(oldSegmentWriter);
                                 } else {
-                                    return Future.exception(
+                                    return FutureUtils.exception(
                                             new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
                                 }
                             }
@@ -468,30 +457,30 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         }
     }
 
-    private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
+    private CompletableFuture<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
             BKLogSegmentWriter oldSegmentWriter,
             final BKLogSegmentWriter newSegmentWriter) {
-        final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>();
+        final CompletableFuture<BKLogSegmentWriter> completePromise = new CompletableFuture<BKLogSegmentWriter>();
         // complete the old log segment
         writeHandler.completeAndCloseLogSegment(oldSegmentWriter)
-                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+                .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
 
                     @Override
                     public void onSuccess(LogSegmentMetadata value) {
                         cacheLogWriter(newSegmentWriter);
                         removeAllocatedLogWriter();
-                        FutureUtils.setValue(completePromise, newSegmentWriter);
+                        FutureUtils.complete(completePromise, newSegmentWriter);
                     }
 
                     @Override
                     public void onFailure(Throwable cause) {
-                        FutureUtils.setException(completePromise, cause);
+                        FutureUtils.completeExceptionally(completePromise, cause);
                     }
                 });
         return completePromise;
     }
 
-    synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(
+    synchronized protected CompletableFuture<BKLogSegmentWriter> rollLogSegmentIfNecessary(
             final BKLogSegmentWriter segmentWriter,
             long startTxId,
             boolean bestEffort,
@@ -500,18 +489,18 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
         try {
             writeHandler = getWriteHandler();
         } catch (IOException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
-        Future<BKLogSegmentWriter> rollPromise;
+        CompletableFuture<BKLogSegmentWriter> rollPromise;
         if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) {
             rollPromise = closeOldLogSegmentAndStartNewOneWithPermit(
                     segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID);
         } else if (null == segmentWriter) {
             rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID);
         } else {
-            rollPromise = Future.value(segmentWriter);
+            rollPromise = FutureUtils.value(segmentWriter);
         }
-        return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() {
+        return rollPromise.thenApply(new Function<BKLogSegmentWriter, BKLogSegmentWriter>() {
             @Override
             public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) {
                 if (segmentWriter == newSegmentWriter) {
@@ -542,7 +531,7 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
 
     protected synchronized void cancelTruncation() {
         if (null != lastTruncationAttempt) {
-            FutureUtils.cancel(lastTruncationAttempt);
+            lastTruncationAttempt.cancel(true);
             lastTruncationAttempt = null;
         }
     }