You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:38 UTC

[06/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
index ccbfc44..4ad0bc0 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
@@ -17,24 +17,23 @@
  */
 package org.apache.distributedlog;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
 import org.apache.distributedlog.exceptions.WriteCancelledException;
 import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
-import org.apache.distributedlog.io.Abortables;
 import org.apache.distributedlog.lock.SessionLockFactory;
 import org.apache.distributedlog.lock.ZKDistributedLock;
 import org.apache.distributedlog.lock.ZKSessionLockFactory;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.common.util.PermitLimiter;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -42,14 +41,12 @@ import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import scala.runtime.AbstractFunction0;
 
 import java.io.IOException;
 import java.net.URI;
@@ -129,9 +126,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                                          boolean acquireLock)
             throws Exception {
         try {
-            Await.result(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0],
+            Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0],
                     ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-        } catch (KeeperException.NodeExistsException nee) {
+        } catch (ZKException zke) {
             // node already exists
         }
         SessionLockFactory lockFactory = new ZKSessionLockFactory(
@@ -150,7 +147,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 Long.MAX_VALUE,
                 NullStatsLogger.INSTANCE);
         if (acquireLock) {
-            return FutureUtils.result(lock.asyncAcquire());
+            return Utils.ioResult(lock.asyncAcquire());
         } else {
             return lock;
         }
@@ -158,9 +155,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
 
     private void closeWriterAndLock(BKLogSegmentWriter writer,
                                     ZKDistributedLock lock)
-            throws IOException {
+            throws Exception {
         try {
-            FutureUtils.result(writer.asyncClose());
+            Utils.ioResult(writer.asyncClose());
         } finally {
             Utils.closeQuietly(lock);
         }
@@ -170,7 +167,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                                     ZKDistributedLock lock)
             throws IOException {
         try {
-            Abortables.abort(writer, false);
+            Utils.abort(writer, false);
         } finally {
             Utils.closeQuietly(lock);
         }
@@ -231,10 +228,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
         ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
-        List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = 0; i < numRecords; i++) {
             futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -248,7 +245,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 10, writer.getPositionWithinLogSegment());
         // close the writer should flush buffered data and release lock
         closeWriterAndLock(writer, lock);
-        Await.result(lockFuture0);
+        Utils.ioResult(lockFuture0);
         lock0.checkOwnership();
         assertEquals("Last tx id should still be " + (numRecords - 1),
                 numRecords - 1, writer.getLastTxId());
@@ -256,7 +253,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 numRecords - 1, writer.getLastTxIdAcknowledged());
         assertEquals("Position should still be " + numRecords,
                 10, writer.getPositionWithinLogSegment());
-        List<DLSN> dlsns = Await.result(Future.collect(futureList));
+        List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList));
         assertEquals("All records should be written",
                 numRecords, dlsns.size());
         for (int i = 0; i < numRecords; i++) {
@@ -293,10 +290,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
         ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
-        List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = 0; i < numRecords; i++) {
             futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -310,7 +307,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 10, writer.getPositionWithinLogSegment());
         // close the writer should flush buffered data and release lock
         abortWriterAndLock(writer, lock);
-        Await.result(lockFuture0);
+        Utils.ioResult(lockFuture0);
         lock0.checkOwnership();
         assertEquals("Last tx id should still be " + (numRecords - 1),
                 numRecords - 1, writer.getLastTxId());
@@ -323,7 +320,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
 
         for (int i = 0; i < numRecords; i++) {
             try {
-                Await.result(futureList.get(i));
+                Utils.ioResult(futureList.get(i));
                 fail("Should be aborted record " + i + " with transmit exception");
             } catch (WriteCancelledException wce) {
                 assertTrue("Record " + i + " should be aborted because of ledger fenced",
@@ -369,10 +366,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
         ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
-        List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = 0; i < numRecords; i++) {
             futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -393,7 +390,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
             assertEquals("Inconsistent rc is thrown",
                     rcToFailComplete, bkte.getBKResultCode());
         }
-        Await.result(lockFuture0);
+        Utils.ioResult(lockFuture0);
         lock0.checkOwnership();
         assertEquals("Last tx id should still be " + (numRecords - 1),
                 numRecords - 1, writer.getLastTxId());
@@ -406,7 +403,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
 
         for (int i = 0; i < numRecords; i++) {
             try {
-                Await.result(futureList.get(i));
+                Utils.ioResult(futureList.get(i));
                 fail("Should be aborted record " + i + " with transmit exception");
             } catch (WriteCancelledException wce) {
                 assertTrue("Record " + i + " should be aborted because of ledger fenced",
@@ -441,10 +438,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
         ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
-        List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = 0; i < numRecords; i++) {
             futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -467,7 +464,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                     BKException.Code.LedgerFencedException, bkte.getBKResultCode());
         }
 
-        Await.result(lockFuture0);
+        Utils.ioResult(lockFuture0);
         lock0.checkOwnership();
 
         assertEquals("Last tx id should still be " + (numRecords - 1),
@@ -481,7 +478,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
 
         for (int i = 0; i < numRecords; i++) {
             try {
-                Await.result(futureList.get(i));
+                Utils.ioResult(futureList.get(i));
                 fail("Should be aborted record " + i + " with transmit exception");
             } catch (BKTransmitException bkte) {
                 assertEquals("Record " + i + " should be aborted",
@@ -513,10 +510,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
         ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
-        List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = 0; i < numRecords; i++) {
             futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -530,23 +527,19 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 numRecords, writer.getPositionWithinLogSegment());
 
         final CountDownLatch deferLatch = new CountDownLatch(1);
-        writer.getFuturePool().apply(new AbstractFunction0<Object>() {
-            @Override
-            public Object apply() {
-                try {
-                    deferLatch.await();
-                } catch (InterruptedException e) {
-                    LOG.warn("Interrupted on deferring completion : ", e);
-                }
-                return null;
+        writer.getFuturePool().submit(() -> {
+            try {
+                deferLatch.await();
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted on deferring completion : ", e);
             }
         });
 
         // transmit the buffered data
-        FutureUtils.result(writer.flush());
+        Utils.ioResult(writer.flush());
 
         // add another 10 records
-        List<Future<DLSN>> anotherFutureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> anotherFutureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = numRecords; i < 2 * numRecords; i++) {
             anotherFutureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -562,13 +555,13 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         // abort the writer: it waits for outstanding transmits and abort buffered data
         abortWriterAndLock(writer, lock);
 
-        Await.result(lockFuture0);
+        Utils.ioResult(lockFuture0);
         lock0.checkOwnership();
 
         // release defer latch so completion would go through
         deferLatch.countDown();
 
-        List<DLSN> dlsns = Await.result(Future.collect(futureList));
+        List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList));
         assertEquals("All first 10 records should be written",
                 numRecords, dlsns.size());
         for (int i = 0; i < numRecords; i++) {
@@ -582,7 +575,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         }
         for (int i = 0; i < numRecords; i++) {
             try {
-                Await.result(anotherFutureList.get(i));
+                Utils.ioResult(anotherFutureList.get(i));
                 fail("Should be aborted record " + (numRecords + i) + " with transmit exception");
             } catch (WriteCancelledException wce) {
                 // writes should be cancelled.
@@ -622,7 +615,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // add 10 records
         int numRecords = 10;
-        List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+        List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
         for (int i = 0; i < numRecords; i++) {
             futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
         }
@@ -639,7 +632,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         // close the writer to flush the output buffer
         closeWriterAndLock(writer, lock);
 
-        List<DLSN> dlsns = Await.result(Future.collect(futureList));
+        List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList));
         assertEquals("All 11 records should be written",
                 numRecords + 1, dlsns.size());
         for (int i = 0; i < numRecords; i++) {
@@ -687,10 +680,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
 
         // close the writer
         closeWriterAndLock(writer, lock);
-        FutureUtils.result(writer.asyncClose());
+        Utils.ioResult(writer.asyncClose());
 
         try {
-            Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
+            Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
             fail("Should fail the write if the writer is closed");
         } catch (WriteException we) {
             // expected
@@ -713,10 +706,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
 
-        FutureUtils.result(writer.markEndOfStream());
+        Utils.ioResult(writer.markEndOfStream());
 
         try {
-            Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
+            Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
             fail("Should fail the write if the writer is marked as end of stream");
         } catch (EndOfStreamException we) {
             // expected
@@ -747,7 +740,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         LogRecord record = DLMTestUtil.getLogRecordInstance(1);
         record.setControl();
         try {
-            Await.result(writer.asyncWrite(record));
+            Utils.ioResult(writer.asyncWrite(record));
             fail("Should fail the writer if the log segment is already fenced");
         } catch (BKTransmitException bkte) {
             // expected
@@ -755,7 +748,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         }
 
         try {
-            Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2)));
+            Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2)));
             fail("Should fail the writer if the log segment is already fenced");
         } catch (WriteException we) {
             // expected
@@ -781,7 +774,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
 
         assertEquals(DLSN.InvalidDLSN,
-                Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2))));
+                Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2))));
         assertEquals(-1L, ((BKLogSegmentEntryWriter) writer.getEntryWriter())
                 .getLedgerHandle().getLastAddPushed());
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
index 2566d34..c0f208f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
@@ -17,12 +17,13 @@
  */
 package org.apache.distributedlog;
 
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.bk.LedgerAllocator;
 import org.apache.distributedlog.bk.LedgerAllocatorPool;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.Utils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -58,7 +59,7 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
         confLocal.setLedgerAllocatorPoolName("test-allocator-pool");
 
         BKDistributedLogNamespace namespace = (BKDistributedLogNamespace)
-                DistributedLogNamespaceBuilder.newBuilder()
+                NamespaceBuilder.newBuilder()
                         .conf(confLocal)
                         .uri(uri)
                         .build();
@@ -66,8 +67,8 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
         FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber,
                 FailpointUtils.FailPointActions.FailPointAction_Throw);
         try {
-            AsyncLogWriter writer =  FutureUtils.result(dlm.openAsyncLogWriter());
-            FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+            AsyncLogWriter writer =  Utils.ioResult(dlm.openAsyncLogWriter());
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
             fail("Should fail opening the writer");
         } catch (IOException ioe) {
             // expected
@@ -82,7 +83,7 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
         LedgerAllocatorPool allocatorPool = (LedgerAllocatorPool) allocator;
         assertEquals(0, allocatorPool.obtainMapSize());
 
-        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
         writer.write(DLMTestUtil.getLogRecordInstance(1L));
         Utils.close(writer);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
index bb8503f..07f0db5 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
@@ -17,6 +17,8 @@
  */
 package org.apache.distributedlog;
 
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index a766d3e..5e4ba07 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
@@ -29,14 +32,13 @@ import org.apache.distributedlog.io.AsyncCloseable;
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Future;
+import org.apache.distributedlog.common.util.PermitLimiter;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
@@ -171,7 +173,7 @@ public class TestDistributedLogBase {
             throws Exception {
         URI uri = createDLMURI("/" + name);
         ensureURICreated(uri);
-        final DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        final Namespace namespace = NamespaceBuilder.newBuilder()
                 .uri(uri)
                 .conf(conf)
                 .build();
@@ -181,14 +183,14 @@ public class TestDistributedLogBase {
                 .build();
         AsyncCloseable resourcesCloseable = new AsyncCloseable() {
             @Override
-            public Future<Void> asyncClose() {
+            public CompletableFuture<Void> asyncClose() {
                 LOG.info("Shutting down the scheduler");
                 SchedulerUtils.shutdownScheduler(scheduler, 1, TimeUnit.SECONDS);
                 LOG.info("Shut down the scheduler");
                 LOG.info("Closing the namespace");
                 namespace.close();
                 LOG.info("Closed the namespace");
-                return Future.Void();
+                return FutureUtils.Void();
             }
         };
         AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
@@ -217,20 +219,20 @@ public class TestDistributedLogBase {
                 Optional.of(resourcesCloseable));
     }
 
-    protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogNamespace namespace)
+    protected LogSegmentMetadataStore getLogSegmentMetadataStore(Namespace namespace)
             throws IOException {
         return namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.READER)
                 .getLogSegmentMetadataStore();
     }
 
-    protected ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) throws Exception {
+    protected ZooKeeperClient getZooKeeperClient(Namespace namespace) throws Exception {
         NamespaceDriver driver = namespace.getNamespaceDriver();
         assertTrue(driver instanceof BKNamespaceDriver);
         return ((BKNamespaceDriver) driver).getWriterZKC();
     }
 
     @SuppressWarnings("deprecation")
-    protected BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) throws Exception {
+    protected BookKeeperClient getBookKeeperClient(Namespace namespace) throws Exception {
         NamespaceDriver driver = namespace.getNamespaceDriver();
         assertTrue(driver instanceof BKNamespaceDriver);
         return ((BKNamespaceDriver) driver).getReaderBKC();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
index 6d8bd0c..30ef481 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
@@ -17,19 +17,17 @@
  */
 package org.apache.distributedlog;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.Entry.Reader;
 import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.io.Buffer;
 import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.io.Buf;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -80,7 +78,7 @@ public class TestEntry {
 
         LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
         try {
-            writer.writeRecord(largeRecord, new Promise<DLSN>());
+            writer.writeRecord(largeRecord, new CompletableFuture<DLSN>());
             Assert.fail("Should fail on writing large record");
         } catch (LogRecordTooLongException lrtle) {
             // expected
@@ -103,12 +101,12 @@ public class TestEntry {
         assertEquals("zero bytes", 0, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
-        List<Future<DLSN>> writePromiseList = Lists.newArrayList();
+        List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
         // write first 5 records
         for (int i = 0; i < 5; i++) {
             LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8));
             record.setPositionWithinLogSegment(i);
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
             writer.writeRecord(record, writePromise);
             writePromiseList.add(writePromise);
             assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
@@ -117,7 +115,7 @@ public class TestEntry {
         // write large record
         LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
         try {
-            writer.writeRecord(largeRecord, new Promise<DLSN>());
+            writer.writeRecord(largeRecord, new CompletableFuture<DLSN>());
             Assert.fail("Should fail on writing large record");
         } catch (LogRecordTooLongException lrtle) {
             // expected
@@ -128,7 +126,7 @@ public class TestEntry {
         for (int i = 0; i < 5; i++) {
             LogRecord record = new LogRecord(i + 5, ("record-" + (i + 5)).getBytes(UTF_8));
             record.setPositionWithinLogSegment(i + 5);
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
             writer.writeRecord(record, writePromise);
             writePromiseList.add(writePromise);
             assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
@@ -138,7 +136,7 @@ public class TestEntry {
 
         // Test transmit complete
         writer.completeTransmit(1L, 1L);
-        List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
+        List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
         for (int i = 0; i < 10; i++) {
             Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
         }
@@ -175,23 +173,23 @@ public class TestEntry {
         assertEquals("zero bytes", 0, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
-        List<Future<DLSN>> writePromiseList = Lists.newArrayList();
+        List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
         // write first 5 records
         for (int i = 0; i < 5; i++) {
             LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8));
             record.setPositionWithinLogSegment(i);
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
             writer.writeRecord(record, writePromise);
             writePromiseList.add(writePromise);
             assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
         }
 
         final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(1024, CompressionCodec.Type.NONE);
-        List<Future<DLSN>> recordSetPromiseList = Lists.newArrayList();
+        List<CompletableFuture<DLSN>> recordSetPromiseList = Lists.newArrayList();
         // write another 5 records as a batch
         for (int i = 0; i < 5; i++) {
             ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8));
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
             recordSetWriter.writeRecord(record, writePromise);
             recordSetPromiseList.add(writePromise);
             assertEquals((i + 1) + " records", (i + 1), recordSetWriter.getNumRecords());
@@ -202,8 +200,8 @@ public class TestEntry {
         LogRecord setRecord = new LogRecord(5L, data);
         setRecord.setPositionWithinLogSegment(5);
         setRecord.setRecordSet();
-        Promise<DLSN> writePromise = new Promise<DLSN>();
-        writePromise.addEventListener(new FutureEventListener<DLSN>() {
+        CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
+        writePromise.whenComplete(new FutureEventListener<DLSN>() {
             @Override
             public void onSuccess(DLSN dlsn) {
                 recordSetWriter.completeTransmit(
@@ -224,7 +222,7 @@ public class TestEntry {
         for (int i = 0; i < 5; i++) {
             LogRecord record = new LogRecord(i + 10, ("record-" + (i + 10)).getBytes(UTF_8));
             record.setPositionWithinLogSegment(i + 10);
-            writePromise = new Promise<DLSN>();
+            writePromise = new CompletableFuture<DLSN>();
             writer.writeRecord(record, writePromise);
             writePromiseList.add(writePromise);
             assertEquals((i + 11) + " records", (i + 11), writer.getNumRecords());
@@ -234,7 +232,7 @@ public class TestEntry {
 
         // Test transmit complete
         writer.completeTransmit(1L, 1L);
-        List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
+        List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
         for (int i = 0; i < 5; i++) {
             Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
         }
@@ -242,7 +240,7 @@ public class TestEntry {
         for (int i = 0; i < 5; i++) {
             Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
         }
-        List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetPromiseList));
+        List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetPromiseList));
         for (int i = 0; i < 5; i++) {
             Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
index fd3c4ee..c111baf 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
@@ -17,7 +17,9 @@
  */
 package org.apache.distributedlog;
 
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,11 +83,11 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
         BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
         for (long j = 1; j <= 4; j++) {
             for (int k = 1; k <= 10; k++) {
-                FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
-                FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
             }
-            FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
-            FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
             if (null == reader0) {
                 reader0 = dlmreader0.getInputStream(1);
             }
@@ -124,13 +126,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
                 writer1.setForceRolling(true);
             }
             for (int k = 1; k <= 2; k++) {
-                FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
-                FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 writer0.setForceRolling(false);
                 writer1.setForceRolling(false);
             }
-            FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
-            FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
             LOG.info("Completed {} write", j);
             if (null == reader0) {
                 reader0 = dlmreader0.getInputStream(1);
@@ -170,13 +172,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
                     writer0.setForceRolling(true);
                     writer1.setForceRolling(true);
                 }
-                FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
-                FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 writer0.setForceRolling(false);
                 writer1.setForceRolling(false);
             }
-            FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
-            FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
             if (null == reader0) {
                 reader0 = dlmreader0.getInputStream(1);
             }
@@ -212,9 +214,9 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
                     writer1.setForceRolling(true);
                     writer1.overRideMinTimeStampToKeep(retentionPeriodOverride);
                 }
-                DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                DLSN dlsn1 = Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 LOG.info("writer1 write record {}", dlsn1);
-                DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                DLSN dlsn0 = Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 LOG.info("writer0 write record {}", dlsn0);
                 if (k == 5) {
                     writer0.setForceRolling(false);
@@ -223,8 +225,8 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
                 }
                 Thread.sleep(5);
             }
-            FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
-            FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
         }
         writer0.close();
         writer1.close();
@@ -264,15 +266,15 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
                     writer0.setForceRecovery(true);
                     writer1.setForceRecovery(true);
                 }
-                DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                DLSN dlsn1 = Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 LOG.info("writer1 write record {} - txid = {}", dlsn1, txid-1);
-                DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                DLSN dlsn0 = Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 LOG.info("writer0 write record {} - txid = {}", dlsn0, txid-1);
                 writer0.setForceRecovery(false);
                 writer1.setForceRecovery(false);
             }
-            FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
-            FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
             if (null == reader0) {
                 reader0 = dlmreader0.getInputStream(1);
             }
@@ -313,13 +315,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
                 writer1.setForceRolling(true);
             }
             for (int k = 1; k <= 2; k++) {
-                FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
-                FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 writer0.setForceRolling(false);
                 writer1.setForceRolling(false);
             }
-            FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
-            FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+            Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
             if (null == reader0) {
                 reader0 = dlmreader0.getInputStream(1);
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
index 152e4d8..8bdf86d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
@@ -20,9 +20,10 @@ package org.apache.distributedlog;
 import java.net.URI;
 import java.util.List;
 
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.util.Await;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ public class TestLogSegmentCreation extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
         DistributedLogManager dlm = namespace.openLog(name);
         final int numSegments = 3;
@@ -68,7 +69,7 @@ public class TestLogSegmentCreation extends TestDistributedLogBase {
         writer2.closeAndComplete();
 
         try {
-            Await.result(writer1.write(DLMTestUtil.getLogRecordInstance(numSegments + 1)));
+            Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(numSegments + 1)));
             fail("Should fail on writing new log records.");
         } catch (Throwable t) {
             LOG.error("Failed to write entry : ", t);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
index 31df059..39ffe85 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
@@ -21,8 +21,7 @@ import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataBuilder;
 import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
 import org.apache.distributedlog.LogSegmentMetadata.TruncationStatus;
 import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException;
-
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,7 +63,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {
         LogSegmentMetadata metadata1 = new LogSegmentMetadataBuilder("/metadata1",
             LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION, 1000, 1).setRegionId(TEST_REGION_ID).build();
         metadata1.write(zkc);
-        LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata1"));
+        LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata1"));
         assertEquals(metadata1, read1);
         assertEquals(TEST_REGION_ID, read1.getRegionId());
     }
@@ -75,7 +74,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {
             1, 1000, 1).setRegionId(TEST_REGION_ID).build();
         metadata1.write(zkc);
         // synchronous read
-        LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata2", true));
+        LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata2", true));
         assertEquals(read1.getLogSegmentId(), metadata1.getLogSegmentId());
         assertEquals(read1.getFirstTxId(), metadata1.getFirstTxId());
         assertEquals(read1.getLastTxId(), metadata1.getLastTxId());
@@ -90,7 +89,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {
         metadata1.write(zkc);
         // synchronous read
         try {
-            LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata-failure"));
+            LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata-failure"));
             fail("The previous statement should throw an exception");
         } catch (UnsupportedMetadataVersionException e) {
             // Expected

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
index 8c01a5c..fcc3395 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
@@ -17,11 +17,12 @@
  */
 package org.apache.distributedlog;
 
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.exceptions.DLIllegalStateException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.metadata.LogMetadata;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -80,7 +81,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
         MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
@@ -111,7 +112,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
         MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
@@ -167,7 +168,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
         MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
@@ -223,7 +224,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
         DistributedLogManager dlm1 = namespace.openLog(streamName);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
index 5bfbf45..2b02704 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
@@ -21,9 +21,11 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
 import org.apache.distributedlog.exceptions.IdleReaderException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -229,7 +231,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
             BKAsyncLogWriter out = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
             for (long j = 1; j <= segmentSize; j++) {
                 LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                FutureUtils.result(out.write(op));
+                Utils.ioResult(out.write(op));
                 numRecordsWritten++;
             }
             out.closeAndComplete();
@@ -237,7 +239,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
 
         BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
         String completedZNode = blplm.completedLedgerZNode(txid - segmentSize, txid - 1, 3);
-        LogSegmentMetadata metadata = FutureUtils.result(LogSegmentMetadata.read(zkClient, completedZNode));
+        LogSegmentMetadata metadata = Utils.ioResult(LogSegmentMetadata.read(zkClient, completedZNode));
         zkClient.get().delete(completedZNode, -1);
         LogSegmentMetadata metadataToChange =
                 metadata.mutator()
@@ -253,7 +255,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
             BKAsyncLogWriter out = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
             for (long j = 1; j <= segmentSize; j++) {
                 LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                FutureUtils.result(out.write(op));
+                Utils.ioResult(out.write(op));
                 numRecordsWritten++;
             }
             out.closeAndComplete();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
index 8f445c4..6c9e354 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
@@ -18,8 +18,10 @@
 package org.apache.distributedlog;
 
 import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.Utils;
 import org.junit.Test;
 
@@ -93,8 +95,8 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
         DistributedLogManager dlmwrite = createNewDLM(confLocal, name);
 
         final AsyncLogWriter writer = dlmwrite.startAsyncLogSegmentNonPartitioned();
-        FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(0)));
-        FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
         final AtomicInteger writeCount = new AtomicInteger(2);
 
         DistributedLogManager dlmread = createNewDLM(conf, name);
@@ -116,7 +118,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
                         while (running.get()) {
                             limiter.acquire();
                             long curTxId = txid++;
-                            dlsn = FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
+                            dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
                             writeCount.incrementAndGet();
                             if (curTxId % 1000 == 0) {
                                 LOG.info("writer write {}", curTxId);
@@ -126,7 +128,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
                         Utils.close(writer);
                     } catch (DLInterruptedException die) {
                         Thread.currentThread().interrupt();
-                    } catch (IOException e) {
+                    } catch (Exception e) {
 
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index ac9984b..eda8eb2 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -20,16 +20,18 @@ package org.apache.distributedlog;
 import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
 import org.apache.distributedlog.exceptions.DLIllegalStateException;
 import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;
@@ -130,14 +132,14 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
     }
 
     private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
-        final Promise<Void> promise = new Promise<Void>();
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
         scheduler.submit(streamName, new Runnable() {
             @Override
             public void run() {
-                FutureUtils.setValue(promise, null);
+                FutureUtils.complete(promise, null);
             }
         });
-        FutureUtils.result(promise);
+        Utils.ioResult(promise);
     }
 
     void generateCompletedLogSegments(DistributedLogManager dlm,
@@ -153,12 +155,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
 
         long txid = startTxId;
         for (long i = 0; i < numCompletedSegments; i++) {
-            AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
             for (long j = 1; j <= segmentSize; j++) {
-                FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
                 ctrlRecord.setControl();
-                FutureUtils.result(writer.write(ctrlRecord));
+                Utils.ioResult(writer.write(ctrlRecord));
             }
             Utils.close(writer);
         }
@@ -167,12 +169,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
     AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
                                               DistributedLogConfiguration conf,
                                               long segmentSize) throws Exception {
-        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
         for (long i = 1L; i <= segmentSize; i++) {
-            FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
             LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
             ctrlRecord.setControl();
-            FutureUtils.result(writer.write(ctrlRecord));
+            Utils.ioResult(writer.write(ctrlRecord));
         }
         return writer;
     }
@@ -325,8 +327,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
 
         // generate list of log segments
         generateCompletedLogSegments(dlm, 3, 3);
-        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
-        FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
+        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+        Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L)));
 
         List<LogSegmentMetadata> segments = dlm.getLogSegments();
 
@@ -382,8 +384,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
 
         // generate list of log segments
         generateCompletedLogSegments(dlm, 3, 2);
-        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
-        FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
+        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+        Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L)));
 
         List<LogSegmentMetadata> segments = dlm.getLogSegments();
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
index 029e872..efc9ac6 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
@@ -18,16 +18,15 @@
 package org.apache.distributedlog;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.logsegment.LogSegmentFilter;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,7 +46,7 @@ public class TestReadUtils extends TestDistributedLogBase {
     @Rule
     public TestName runtime = new TestName();
 
-    private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
+    private CompletableFuture<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
             BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception {
         List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments();
         return ReadUtils.getLogRecordNotLessThanTxId(
@@ -60,7 +59,7 @@ public class TestReadUtils extends TestDistributedLogBase {
         );
     }
 
-    private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
+    private CompletableFuture<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
         List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments();
         return ReadUtils.asyncReadFirstUserRecord(
                 bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
@@ -68,9 +67,9 @@ public class TestReadUtils extends TestDistributedLogBase {
         );
     }
 
-    private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
+    private CompletableFuture<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+        List<LogSegmentMetadata> ledgerList = Utils.ioResult(
                 readHandler.readLogSegmentsFromStore(
                         LogSegmentMetadata.COMPARATOR,
                         LogSegmentFilter.DEFAULT_FILTER,
@@ -89,8 +88,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */);
 
         DLSN dlsn = new DLSN(1,0,0);
-        Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals("should be an exact match", dlsn, logrec.getDlsn());
         bkdlm.close();
     }
@@ -102,8 +101,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */);
 
         DLSN dlsn = new DLSN(1,1,0);
-        Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals("should be an exact match", dlsn, logrec.getDlsn());
         bkdlm.close();
     }
@@ -115,8 +114,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */);
 
         DLSN dlsn = new DLSN(1,0,1);
-        Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(new DLSN(1,1,0), logrec.getDlsn());
         bkdlm.close();
     }
@@ -128,8 +127,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5 /* user recs */ , 1 /* txid */);
 
         DLSN dlsn = new DLSN(2,0,0);
-        Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(null, logrec);
         bkdlm.close();
     }
@@ -144,8 +143,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5 /* user recs */ , txid);
 
         DLSN dlsn = new DLSN(1,3,0);
-        Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 1, dlsn);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 1, dlsn);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(new DLSN(2,0,0), logrec.getDlsn());
         bkdlm.close();
     }
@@ -157,8 +156,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 5, 1 /* txid */);
 
         DLSN dlsn = new DLSN(1,3,0);
-        Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(new DLSN(1,5,0), logrec.getDlsn());
         bkdlm.close();
     }
@@ -169,8 +168,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 5, 1 /* txid */);
 
-        Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(new DLSN(1,9,0), logrec.getDlsn());
         bkdlm.close();
     }
@@ -182,15 +181,15 @@ public class TestReadUtils extends TestDistributedLogBase {
 
         AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
         int txid = 1;
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
         Utils.close(out);
 
-        Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(new DLSN(1,2,0), logrec.getDlsn());
         bkdlm.close();
     }
@@ -201,8 +200,8 @@ public class TestReadUtils extends TestDistributedLogBase {
         BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 0, 1 /* txid */);
 
-        Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
-        LogRecordWithDLSN logrec = Await.result(futureLogrec);
+        CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
+        LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
         assertEquals(null, logrec);
         bkdlm.close();
     }
@@ -259,7 +258,7 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 1, 1 /* txid */);
 
         Optional<LogRecordWithDLSN> result =
-                FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 999L));
+                Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 999L));
         assertFalse(result.isPresent());
     }
 
@@ -270,7 +269,7 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 1, 999L /* txid */);
 
         Optional<LogRecordWithDLSN> result =
-                FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 99L));
+                Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 99L));
         assertTrue(result.isPresent());
         assertEquals(999L, result.get().getTransactionId());
         assertEquals(0L, result.get().getDlsn().getEntryId());
@@ -284,7 +283,7 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 5, 1L /* txid */);
 
         Optional<LogRecordWithDLSN> result =
-                FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 3L));
+                Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 3L));
         assertTrue(result.isPresent());
         assertEquals(3L, result.get().getTransactionId());
     }
@@ -296,7 +295,7 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 100, 1L /* txid */);
 
         Optional<LogRecordWithDLSN> result =
-                FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 9L));
+                Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 9L));
         assertTrue(result.isPresent());
         assertEquals(9L, result.get().getTransactionId());
     }
@@ -308,7 +307,7 @@ public class TestReadUtils extends TestDistributedLogBase {
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 100, 1L /* txid */, 3L);
 
         Optional<LogRecordWithDLSN> result =
-                FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 23L));
+                Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 23L));
         assertTrue(result.isPresent());
         assertEquals(25L, result.get().getTransactionId());
     }
@@ -321,22 +320,22 @@ public class TestReadUtils extends TestDistributedLogBase {
         long txid = 1L;
         for (int i = 0; i < 10; ++i) {
             LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
-            Await.result(out.write(record));
+            Utils.ioResult(out.write(record));
             txid += 1;
         }
         long txidToSearch = txid;
         for (int i = 0; i < 10; ++i) {
             LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txidToSearch);
-            Await.result(out.write(record));
+            Utils.ioResult(out.write(record));
         }
         for (int i = 0; i < 10; ++i) {
             LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
-            Await.result(out.write(record));
+            Utils.ioResult(out.write(record));
             txid += 1;
         }
         Utils.close(out);
         Optional<LogRecordWithDLSN> result =
-                FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, txidToSearch));
+                Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, txidToSearch));
         assertTrue(result.isPresent());
         assertEquals(10L, result.get().getDlsn().getEntryId());
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
index ad5bf8e..8d9f846 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
@@ -17,13 +17,13 @@
  */
 package org.apache.distributedlog;
 
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
@@ -140,8 +140,8 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
     }
 
     private void readNext() {
-        Future<LogRecordWithDLSN> record = reader.readNext();
-        record.addEventListener(this);
+        CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+        record.whenComplete(this);
     }
 
     @Override
@@ -184,12 +184,8 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
 
     private void closeReader() {
         if (null != reader) {
-            reader.asyncClose().onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(Throwable cause) {
-                    LOG.warn("Exception on closing reader {} : ", readerName, cause);
-                    return BoxedUnit.UNIT;
-                }
+            reader.asyncClose().whenComplete((value, cause) -> {
+                LOG.warn("Exception on closing reader {} : ", readerName, cause);
             });
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
index 9032866..0111e4d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
@@ -20,14 +20,16 @@ package org.apache.distributedlog;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.feature.CoreFeatureKeys;
 import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.SettableFeature;
@@ -35,9 +37,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.distributedlog.annotations.DistributedLogAnnotations.FlakyTest;
-import com.twitter.util.Await;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.FlakyTest;
 
 import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.*;
@@ -79,7 +79,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         // send requests in parallel
         for (int i = 1; i <= numEntries; i++) {
             final int entryId = i;
-            writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() {
+            writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>() {
 
                 @Override
                 public void onSuccess(DLSN value) {
@@ -125,7 +125,9 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         // send requests in parallel to have outstanding requests
         for (int i = 1; i <= numEntries; i++) {
             final int entryId = i;
-            Future<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() {
+            CompletableFuture<DLSN> writeFuture =
+                writer.write(DLMTestUtil.getLogRecordInstance(entryId))
+                    .whenComplete(new FutureEventListener<DLSN>() {
 
                 @Override
                 public void onSuccess(DLSN value) {
@@ -146,7 +148,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
             });
             if (i == 1) {
                 // wait for first log segment created
-                FutureUtils.result(writeFuture);
+                Utils.ioResult(writeFuture);
             }
         }
         latch.await();
@@ -191,7 +193,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         long txId = 1L;
 
         // Create Log Segments
-        Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
 
         FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate,
                 FailpointUtils.FailPointActions.FailPointAction_Throw);
@@ -201,7 +203,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
             final int numRecords = 10;
             final CountDownLatch latch = new CountDownLatch(numRecords);
             for (int i = 0; i < numRecords; i++) {
-                writer.write(DLMTestUtil.getLogRecordInstance(++txId)).addEventListener(new FutureEventListener<DLSN>() {
+                writer.write(DLMTestUtil.getLogRecordInstance(++txId)).whenComplete(new FutureEventListener<DLSN>() {
                     @Override
                     public void onSuccess(DLSN value) {
                         logger.info("Completed entry : {}.", value);
@@ -266,7 +268,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         // send requests in parallel to have outstanding requests
         for (int i = 1; i <= numLogSegments; i++) {
             final int entryId = i;
-            Future<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() {
+            CompletableFuture<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>() {
                 @Override
                 public void onSuccess(DLSN value) {
                     logger.info("Completed entry {} : {}.", entryId, value);
@@ -279,7 +281,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
             });
             if (i == 1) {
                 // wait for first log segment created
-                FutureUtils.result(writeFuture);
+                Utils.ioResult(writeFuture);
             }
         }
         latch.await();
@@ -297,7 +299,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         // writer should work after rolling log segments
         // there would be (numLogSegments/2) segments based on current rolling policy
         for (int i = 1; i <= numLogSegments; i++) {
-            DLSN newDLSN = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i)));
+            DLSN newDLSN = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i)));
             logger.info("Completed entry {} : {}", numLogSegments + i, newDLSN);
         }
 
@@ -364,7 +366,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
 
         // 2) reader should be able to read 5 entries.
         for (long i = 1; i <= numEntries; i++) {
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             DLMTestUtil.verifyLogRecord(record);
             assertEquals(i, record.getTransactionId());
             assertEquals(record.getTransactionId() - 1, record.getSequenceId());
@@ -418,7 +420,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         anotherWriter.closeAndComplete();
 
         for (long i = numEntries + 1; i <= numEntries + 3; i++) {
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             DLMTestUtil.verifyLogRecord(record);
             assertEquals(i, record.getTransactionId());
         }