You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:39 UTC

[07/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
index 2f5766d..5123178 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -30,16 +31,22 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAccessor;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -63,15 +70,10 @@ import org.apache.distributedlog.exceptions.OverCapacityException;
 import org.apache.distributedlog.exceptions.ReadCancelledException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.lock.DistributedLock;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.SimplePermitLimiter;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
 
 import junit.framework.Assert;
 import static com.google.common.base.Charsets.UTF_8;
@@ -111,13 +113,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         for (long i = 0; i < 3; i++) {
             final long currentLogSegmentSeqNo = i + 1;
             BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
-            DLSN dlsn = Await.result(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8))));
+            DLSN dlsn = Utils.ioResult(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8))));
             assertEquals(currentLogSegmentSeqNo, dlsn.getLogSegmentSequenceNo());
             assertEquals(0, dlsn.getEntryId());
             assertEquals(0, dlsn.getSlotId());
             for (long j = 1; j < 10; j++) {
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Await.result(writer.write(record));
+                Utils.ioResult(writer.write(record));
             }
             writer.closeAndComplete();
         }
@@ -161,8 +163,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         // Write one record larger than max seg size. Ledger doesn't roll until next write.
         int txid = 1;
         LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048);
-        Future<DLSN> result = writer.write(record);
-        DLSN dlsn = Await.result(result, Duration.fromSeconds(10));
+        CompletableFuture<DLSN> result = writer.write(record);
+        DLSN dlsn = Utils.ioResult(result, 10, TimeUnit.SECONDS);
         assertEquals(1, dlsn.getLogSegmentSequenceNo());
 
         record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1);
@@ -207,8 +209,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             for (long j = 0; j < numRecordsPerLogSegment; j++) {
                 final long currentEntryId = j;
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Future<DLSN> dlsnFuture = writer.write(record);
-                dlsnFuture.addEventListener(new FutureEventListener<DLSN>() {
+                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+                dlsnFuture.whenComplete(new FutureEventListener<DLSN>() {
                     @Override
                     public void onSuccess(DLSN value) {
                         if(value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) {
@@ -245,7 +247,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         assertEquals("Last DLSN" + last.getDlsn() + " isn't the maximum DLSN " + maxDLSN.get(),
                 last.getDlsn(), maxDLSN.get());
         assertEquals(last.getDlsn(), dlm.getLastDLSN());
-        assertEquals(last.getDlsn(), Await.result(dlm.getLastDLSNAsync()));
+        assertEquals(last.getDlsn(), Utils.ioResult(dlm.getLastDLSNAsync()));
         DLMTestUtil.verifyLargeLogRecord(last);
 
         dlm.close();
@@ -330,8 +332,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                                  final CountDownLatch syncLatch,
                                  final CountDownLatch completionLatch,
                                  final AtomicBoolean errorsFound) {
-        Future<LogRecordWithDLSN> record = reader.readNext();
-        record.addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+        CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+        record.whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
             @Override
             public void onSuccess(LogRecordWithDLSN value) {
                 try {
@@ -455,7 +457,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             if (expectedTxID == numLogSegments * numRecordsPerLogSegment) {
                 break;
             }
-            List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20));
+            List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20));
             LOG.info("Bulk read {} entries.", records.size());
 
             assertTrue(records.size() >= 1);
@@ -495,7 +497,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         for (long i = 0; i < 3; i++) {
             // since we batched 20 entries into single bookkeeper entry
             // we should be able to read 20 entries as a batch.
-            List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20));
+            List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20));
             assertEquals(20, records.size());
             for (LogRecordWithDLSN record : records) {
                 assertEquals(expectedTxID, record.getTransactionId());
@@ -531,7 +533,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                 name, asyncReader.getStreamName());
         long numTrans = 0;
         DLSN lastDLSN = DLSN.InvalidDLSN;
-        LogRecordWithDLSN record = Await.result(asyncReader.readNext());
+        LogRecordWithDLSN record = Utils.ioResult(asyncReader.readNext());
         while (null != record) {
             DLMTestUtil.verifyEmptyLogRecord(record);
             assertEquals(0, record.getDlsn().getSlotId());
@@ -541,7 +543,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             if (numTrans >= (txid - 1)) {
                 break;
             }
-            record = Await.result(asyncReader.readNext());
+            record = Utils.ioResult(asyncReader.readNext());
         }
         assertEquals((txid - 1), numTrans);
         Utils.close(asyncReader);
@@ -715,8 +717,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             for (long j = 0; j < 10; j++) {
                 final long currentEntryId = j;
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Future<DLSN> dlsnFuture = writer.write(record);
-                dlsnFuture.addEventListener(new WriteFutureEventListener(
+                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+                dlsnFuture.whenComplete(new WriteFutureEventListener(
                         record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
                 if (i == 0 && j == 0) {
                     boolean monotonic = LogSegmentMetadata.supportsSequenceId(logSegmentVersion);
@@ -793,8 +795,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             for (long j = 0; j < 10; j++) {
                 final long currentEntryId = j;
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Future<DLSN> dlsnFuture = writer.write(record);
-                dlsnFuture.addEventListener(new WriteFutureEventListener(
+                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+                dlsnFuture.whenComplete(new WriteFutureEventListener(
                         record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
             }
             writer.closeAndComplete();
@@ -835,7 +837,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         URI uri = createDLMURI("/" + name);
         ensureURICreated(uri);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(confLocal).uri(uri).build();
         final DistributedLogManager[] dlms = new DistributedLogManager[count];
         final TestReader[] readers = new TestReader[count];
@@ -868,8 +870,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                 final long currentEntryId = j;
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                 for (int s = 0; s < count; s++) {
-                    Future<DLSN> dlsnFuture = writers[s].write(record);
-                    dlsnFuture.addEventListener(new WriteFutureEventListener(
+                    CompletableFuture<DLSN> dlsnFuture = writers[s].write(record);
+                    dlsnFuture.whenComplete(new WriteFutureEventListener(
                             record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
                 }
             }
@@ -937,8 +939,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             for (long j = 0; j < numRecordsPerLogSegment; j++) {
                 final long currentEntryId = j;
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Future<DLSN> dlsnFuture = writer.write(record);
-                dlsnFuture.addEventListener(new WriteFutureEventListener(
+                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+                dlsnFuture.whenComplete(new WriteFutureEventListener(
                         record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
             }
             writer.closeAndComplete();
@@ -988,8 +990,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             for (long j = 0; j < numRecordsPerLogSegment; j++) {
                 Thread.sleep(50);
                 final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Future<DLSN> dlsnFuture = writer.write(record);
-                dlsnFuture.addEventListener(new WriteFutureEventListener(
+                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+                dlsnFuture.whenComplete(new WriteFutureEventListener(
                         record, currentLogSegmentSeqNo, j, writeLatch, writeErrors, false));
                 if (i == 0 && j == 0) {
                     boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion());
@@ -1027,7 +1029,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         writer.closeAndComplete();
 
         final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
-        LogRecordWithDLSN record = Await.result(reader.readNext());
+        LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
         assertEquals(1L, record.getTransactionId());
         DLMTestUtil.verifyLogRecord(record);
 
@@ -1037,7 +1039,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             @Override
             public void run() {
                 try {
-                    Await.result(reader.readNext());
+                    Utils.ioResult(reader.readNext());
                 } catch (ReadCancelledException rce) {
                     receiveExpectedException.set(true);
                 } catch (Throwable t) {
@@ -1060,7 +1062,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         // closed reader should reject any readNext
         try {
-            Await.result(reader.readNext());
+            Utils.ioResult(reader.readNext());
             fail("Reader should reject readNext if it is closed.");
         } catch (ReadCancelledException rce) {
             // expected
@@ -1087,8 +1089,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         for (long i = 0; i < COUNT; i++) {
             Thread.sleep(1);
             final LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
-            Future<DLSN> dlsnFuture = writer.write(record);
-            dlsnFuture.addEventListener(new FutureEventListener<DLSN>() {
+            CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+            dlsnFuture.whenComplete(new FutureEventListener<DLSN>() {
                 @Override
                 public void onSuccess(DLSN value) {
                     syncLatch.countDown();
@@ -1142,10 +1144,10 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         URI uri = createDLMURI("/" + name);
         ensureURICreated(uri);
 
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(confLocal).uri(uri).clientId("gabbagoo").build();
         DistributedLogManager dlm = namespace.openLog(name);
-        DistributedLogNamespace namespace1 = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace1 = NamespaceBuilder.newBuilder()
                 .conf(confLocal).uri(uri).clientId("tortellini").build();
         DistributedLogManager dlm1 = namespace1.openLog(name);
 
@@ -1153,12 +1155,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
 
         // First write succeeds since lock isnt checked until transmit, which is scheduled
-        Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
         writer.flushAndCommit();
 
         BKLogSegmentWriter perStreamWriter = writer.getCachedLogWriter();
         DistributedLock lock = perStreamWriter.getLock();
-        FutureUtils.result(lock.asyncClose());
+        Utils.ioResult(lock.asyncClose());
 
         // Get second writer, steal lock
         BKAsyncLogWriter writer2 = (BKAsyncLogWriter)(dlm1.startAsyncLogSegmentNonPartitioned());
@@ -1169,7 +1171,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
             // Succeeds, kicks off scheduled flush
             Thread.sleep(100);
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
             fail("should have thrown");
         } catch (LockingException ex) {
             LOG.debug("caught exception ", ex);
@@ -1194,13 +1196,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             dlm = createNewDLM(confLocal, runtime.getMethodName());
         }
         BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
-        ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000);
+        ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000);
         for (int i = 0; i < 1000; i++) {
             results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         }
-        for (Future<DLSN> result : results) {
+        for (CompletableFuture<DLSN> result : results) {
             try {
-                Await.result(result);
+                Utils.ioResult(result);
                 if (shouldFail) {
                     fail("should fail due to no outstanding writes permitted");
                 }
@@ -1242,12 +1244,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         confLocal.setOutstandingWriteLimitDarkmode(true);
         DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
         BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
-        ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000);
+        ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000);
         for (int i = 0; i < 1000; i++) {
             results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         }
-        for (Future<DLSN> result : results) {
-            Await.result(result);
+        for (CompletableFuture<DLSN> result : results) {
+            Utils.ioResult(result);
         }
         writer.closeAndComplete();
         dlm.close();
@@ -1266,7 +1268,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         long txId = 1L;
         for (int i = 0; i < 5; i++) {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
         }
 
         BKLogSegmentWriter logWriter = writer.getCachedLogWriter();
@@ -1277,7 +1279,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                 BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
 
         try {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
             fail("Should fail write to a fenced ledger with BKTransmitException");
         } catch (BKTransmitException bkte) {
             // expected
@@ -1310,7 +1312,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         long txId = 1L;
         for (int i = 0; i < 5; i++) {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
         }
 
         BKLogSegmentWriter logWriter = writer.getCachedLogWriter();
@@ -1408,8 +1410,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         int recordCount = 0;
         try {
             while (true) {
-                Future<LogRecordWithDLSN> record = reader.readNext();
-                Await.result(record);
+                CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+                Utils.ioResult(record);
                 recordCount++;
 
                 if (recordCount >= segmentSize * numSegments) {
@@ -1465,7 +1467,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         BKAsyncLogWriter writer =
                 (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
 
-        Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         writer.abort();
 
         for (int i = 0; i < 2; i++) {
@@ -1548,8 +1550,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         int recordCount = 0;
         try {
             while (true) {
-                Future<LogRecordWithDLSN> record = reader.readNext();
-                Await.result(record);
+                CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+                Utils.ioResult(record);
                 if (recordCount == 0) {
                     readLatch.countDown();
                 }
@@ -1582,7 +1584,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         int numRecords = 10;
         for (int i = 0; i < numRecords; i++) {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
             assertEquals("last tx id should become " + i,
                     i, writer.getLastTxId());
         }
@@ -1612,16 +1614,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         int numRecords = 40;
         for (int i = 1; i <= numRecords; i++) {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
             assertEquals("last tx id should become " + i,
                     i, writer.getLastTxId());
         }
         LogRecord record = DLMTestUtil.getLogRecordInstance(numRecords);
         record.setControl();
-        Await.result(writer.write(record));
+        Utils.ioResult(writer.write(record));
 
         BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
-        record = Await.result(reader.readNext());
+        record = Utils.ioResult(reader.readNext());
         LOG.info("Read record {}", record);
         assertEquals(1L, record.getTransactionId());
 
@@ -1629,7 +1631,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         assertTrue(reader.getReadAheadReader().getNumCachedEntries() <= maxAllowedCachedRecords);
 
         for (int i = 2; i <= numRecords; i++) {
-            record = Await.result(reader.readNext());
+            record = Utils.ioResult(reader.readNext());
             LOG.info("Read record {}", record);
             assertEquals((long) i, record.getTransactionId());
             TimeUnit.MILLISECONDS.sleep(20);
@@ -1656,18 +1658,18 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         final int NUM_RECORDS = 10;
         int i = 1;
         for (; i <= NUM_RECORDS; i++) {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
             assertEquals("last tx id should become " + i,
                     i, writer.getLastTxId());
         }
 
-        Await.result(writer.markEndOfStream());
+        Utils.ioResult(writer.markEndOfStream());
 
         // Multiple end of streams are ok.
-        Await.result(writer.markEndOfStream());
+        Utils.ioResult(writer.markEndOfStream());
 
         try {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
@@ -1675,12 +1677,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         LogRecord record = null;
         for (int j = 0; j < NUM_RECORDS; j++) {
-            record = Await.result(reader.readNext());
+            record = Utils.ioResult(reader.readNext());
             assertEquals(j+1, record.getTransactionId());
         }
 
         try {
-            record = Await.result(reader.readNext());
+            record = Utils.ioResult(reader.readNext());
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
@@ -1698,9 +1700,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
         BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
-        Await.result(writer.markEndOfStream());
+        Utils.ioResult(writer.markEndOfStream());
         try {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
@@ -1708,7 +1710,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         try {
-            LogRecord record = Await.result(reader.readNext());
+            LogRecord record = Utils.ioResult(reader.readNext());
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
@@ -1726,32 +1728,32 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
         BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
-        FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L);
         controlRecord.setControl();
-        FutureUtils.result(writer.write(controlRecord));
+        Utils.ioResult(writer.write(controlRecord));
 
         BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
-        Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-        Future<LogRecordWithDLSN> readFuture = reader.readNext();
+        CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();
 
         // write another records
         for (int i = 0; i < 5; i++) {
             long txid = 2L + i;
-            FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
             controlRecord = DLMTestUtil.getLogRecordInstance(txid);
             controlRecord.setControl();
-            FutureUtils.result(writer.write(controlRecord));
+            Utils.ioResult(writer.write(controlRecord));
         }
 
-        List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture);
+        List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture);
         assertEquals(2, bulkReadRecords.size());
         assertEquals(1L, bulkReadRecords.get(0).getTransactionId());
         assertEquals(2L, bulkReadRecords.get(1).getTransactionId());
         for (LogRecordWithDLSN record : bulkReadRecords) {
             DLMTestUtil.verifyLogRecord(record);
         }
-        LogRecordWithDLSN record = FutureUtils.result(readFuture);
+        LogRecordWithDLSN record = Utils.ioResult(readFuture);
         assertEquals(3L, record.getTransactionId());
         DLMTestUtil.verifyLogRecord(record);
 
@@ -1771,16 +1773,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
         BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
-        FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L);
         controlRecord.setControl();
-        FutureUtils.result(writer.write(controlRecord));
+        Utils.ioResult(writer.write(controlRecord));
 
         BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
-        Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
-        Future<LogRecordWithDLSN> readFuture = reader.readNext();
+        CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
+        CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();
 
-        List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture);
+        List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture);
         assertEquals(1, bulkReadRecords.size());
         assertEquals(1L, bulkReadRecords.get(0).getTransactionId());
         for (LogRecordWithDLSN record : bulkReadRecords) {
@@ -1790,13 +1792,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         // write another records
         for (int i = 0; i < 5; i++) {
             long txid = 2L + i;
-            FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
             controlRecord = DLMTestUtil.getLogRecordInstance(txid);
             controlRecord.setControl();
-            FutureUtils.result(writer.write(controlRecord));
+            Utils.ioResult(writer.write(controlRecord));
         }
 
-        LogRecordWithDLSN record = FutureUtils.result(readFuture);
+        LogRecordWithDLSN record = Utils.ioResult(readFuture);
         assertEquals(2L, record.getTransactionId());
         DLMTestUtil.verifyLogRecord(record);
 
@@ -1832,7 +1834,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         // 3 segments, 10 records each, immediate flush, batch size 1, so just the first
         // record in each ledger is discarded, for 30 - 3 = 27 records.
         for (int i = 0; i < 27; i++) {
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             assertFalse(record.getDlsn().getEntryId() % 10 == 0);
         }
 
@@ -1868,7 +1870,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             // 3 segments, 10 records each, immediate flush, batch size 1, so just the first
             // record in each ledger is discarded, for 30 - 3 = 27 records.
             for (int i = 0; i < 30; i++) {
-                LogRecordWithDLSN record = Await.result(reader.readNext());
+                LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
                 assertFalse(record.getDlsn().getEntryId() % 10 == 0);
             }
             fail("should have thrown");
@@ -1909,7 +1911,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         // 3. ranges 6-10, 7-11, 8-12, 9-13 will be bad
         // And so on, so 5 records in each 10 will be discarded, for 50 good records.
         for (int i = 0; i < 50; i++) {
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             assertFalse(record.getDlsn().getEntryId() % 10 == 0);
         }
 
@@ -1946,7 +1948,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         // 2. range 1-8 will be good, but only contain 4 records
         // And so on for the next segment, so 4 records in each segment, for 12 good records
         for (int i = 0; i < 12; i++) {
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             assertFalse(record.getDlsn().getEntryId() % 10 == 0);
         }
 
@@ -1970,13 +1972,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         URI uri = createDLMURI("/" + name);
         ensureURICreated(uri);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(confLocal).uri(uri).build();
 
         // use the pool
         DistributedLogManager dlm = namespace.openLog(name + "-pool");
         AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         List<LogSegmentMetadata> segments = dlm.getLogSegments();
         assertEquals(1, segments.size());
         long ledgerId = segments.get(0).getLogSegmentId();
@@ -1995,7 +1997,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                 Optional.of(dynConf),
                 Optional.<StatsLogger>absent());
         writer = dlm.startAsyncLogSegmentNonPartitioned();
-        FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         segments = dlm.getLogSegments();
         assertEquals(1, segments.size());
         ledgerId = segments.get(0).getLogSegmentId();
@@ -2023,17 +2025,17 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
         BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
-        List<Future<DLSN>> writeFutures = Lists.newArrayList();
+        List<CompletableFuture<DLSN>> writeFutures = Lists.newArrayList();
         for (int i = 0; i < 5; i++) {
             LogRecord record = DLMTestUtil.getLogRecordInstance(1L + i);
             writeFutures.add(writer.write(record));
         }
-        List<Future<DLSN>> recordSetFutures = Lists.newArrayList();
+        List<CompletableFuture<DLSN>> recordSetFutures = Lists.newArrayList();
         // write another 5 records
         final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(4096, CompressionCodec.Type.LZ4);
         for (int i = 0; i < 5; i++) {
             LogRecord record = DLMTestUtil.getLogRecordInstance(6L + i);
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
             recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise);
             recordSetFutures.add(writePromise);
         }
@@ -2042,8 +2044,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         recordSetBuffer.get(data);
         LogRecord setRecord = new LogRecord(6L, data);
         setRecord.setRecordSet();
-        Future<DLSN> writeRecordSetFuture = writer.write(setRecord);
-        writeRecordSetFuture.addEventListener(new FutureEventListener<DLSN>() {
+        CompletableFuture<DLSN> writeRecordSetFuture = writer.write(setRecord);
+        writeRecordSetFuture.whenComplete(new FutureEventListener<DLSN>() {
             @Override
             public void onSuccess(DLSN dlsn) {
                 recordSetWriter.completeTransmit(
@@ -2058,20 +2060,20 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }
         });
         writeFutures.add(writeRecordSetFuture);
-        FutureUtils.result(writeRecordSetFuture);
+        Utils.ioResult(writeRecordSetFuture);
         // write last 5 records
         for (int i = 0; i < 5; i++) {
             LogRecord record = DLMTestUtil.getLogRecordInstance(11L + i);
-            Future<DLSN> writeFuture = writer.write(record);
+            CompletableFuture<DLSN> writeFuture = writer.write(record);
             writeFutures.add(writeFuture);
             // make sure get log record count returns the right count
             if (i == 0) {
-                FutureUtils.result(writeFuture);
+                Utils.ioResult(writeFuture);
                 assertEquals(10, dlm.getLogRecordCount());
             }
         }
 
-        List<DLSN> writeResults = FutureUtils.result(Future.collect(writeFutures));
+        List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writeFutures));
 
         for (int i = 0; i < 5; i++) {
             Assert.assertEquals(new DLSN(1L, i, 0L), writeResults.get(i));
@@ -2080,12 +2082,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         for (int i = 0; i < 5; i++) {
             Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i));
         }
-        List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetFutures));
+        List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetFutures));
         for (int i = 0; i < 5; i++) {
             Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i));
         }
 
-        FutureUtils.result(writer.flushAndCommit());
+        Utils.ioResult(writer.flushAndCommit());
 
         DistributedLogConfiguration readConf1 = new DistributedLogConfiguration();
         readConf1.addConfiguration(confLocal);
@@ -2094,7 +2096,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         DistributedLogManager readDLM1 = createNewDLM(readConf1, name);
         AsyncLogReader reader1 = readDLM1.getAsyncLogReader(DLSN.InitialDLSN);
         for (int i = 0; i < 15; i++) {
-            LogRecordWithDLSN record = FutureUtils.result(reader1.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
             if (i < 5) {
                 assertEquals(new DLSN(1L, i, 0L), record.getDlsn());
                 assertEquals(1L + i, record.getTransactionId());
@@ -2118,7 +2120,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         DistributedLogManager readDLM2 = createNewDLM(readConf2, name);
         AsyncLogReader reader2 = readDLM2.getAsyncLogReader(DLSN.InitialDLSN);
         for (int i = 0; i < 11; i++) {
-            LogRecordWithDLSN record = FutureUtils.result(reader2.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader2.readNext());
             LOG.info("Read record {}", record);
             if (i < 5) {
                 assertEquals(new DLSN(1L, i, 0L), record.getDlsn());
@@ -2159,12 +2161,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         ensureURICreated(uri);
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
-        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter());
         writer.write(DLMTestUtil.getLogRecordInstance(1L));
 
-        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN));
         try {
-            FutureUtils.result(reader.readNext());
+            Utils.ioResult(reader.readNext());
             fail("Should fail when stream is idle");
         } catch (IdleReaderException ire) {
             // expected
@@ -2191,11 +2193,11 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         ensureURICreated(uri);
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
-        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter());
         writer.write(DLMTestUtil.getLogRecordInstance(1L));
 
-        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
-        LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+        AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
         assertEquals(1L, record.getTransactionId());
         DLMTestUtil.verifyLogRecord(record);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index dff0133..18e097f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -23,10 +23,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.MetadataAccessor;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
@@ -35,7 +44,6 @@ import org.apache.distributedlog.exceptions.LogReadException;
 import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
 import org.apache.distributedlog.io.Abortables;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.bookkeeper.client.BKException;
@@ -54,12 +62,8 @@ import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.MetadataUpdater;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 
 import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
@@ -89,7 +93,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             BKLogWriteHandler blplm = dlm.createWriteHandler(true);
             assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
                                                                 perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-            FutureUtils.result(blplm.asyncClose());
+            Utils.ioResult(blplm.asyncClose());
         }
 
         LogWriter writer = dlm.startLogSegmentNonPartitioned();
@@ -129,7 +133,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         BKLogWriteHandler blplm = dlm.createWriteHandler(true);
         assertNotNull(zkc.exists(blplm.completedLedgerZNode(1, 100,
                 perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-        FutureUtils.result(blplm.asyncClose());
+        Utils.ioResult(blplm.asyncClose());
     }
 
     @Test(timeout = 60000)
@@ -167,7 +171,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             assertNotNull(
                 zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
                                                       perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-            FutureUtils.result(blplm.asyncClose());
+            Utils.ioResult(blplm.asyncClose());
         }
 
         BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
@@ -263,14 +267,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         confLocal.setWriteLockEnabled(false);
         String name = "distrlog-two-writers-lock-disabled";
         DistributedLogManager manager = createNewDLM(confLocal, name);
-        AsyncLogWriter writer1 = FutureUtils.result(manager.openAsyncLogWriter());
-        FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
-        AsyncLogWriter writer2 = FutureUtils.result(manager.openAsyncLogWriter());
-        FutureUtils.result(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
+        AsyncLogWriter writer1 = Utils.ioResult(manager.openAsyncLogWriter());
+        Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
+        AsyncLogWriter writer2 = Utils.ioResult(manager.openAsyncLogWriter());
+        Utils.ioResult(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
 
         // write a record to writer 1 again
         try {
-            FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(3L)));
+            Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(3L)));
             fail("Should fail writing record to writer 1 again as writer 2 took over the ownership");
         } catch (BKTransmitException bkte) {
             assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode());
@@ -311,7 +315,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             assertNotNull(
                 zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
                                                       perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-            FutureUtils.result(blplm.asyncClose());
+            Utils.ioResult(blplm.asyncClose());
         }
         BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
         for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) {
@@ -394,7 +398,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             assertNotNull(
                 zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1,
                                                       perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-            FutureUtils.result(blplm.asyncClose());
+            Utils.ioResult(blplm.asyncClose());
         }
 
         BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
@@ -411,14 +415,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
 
         AsyncLogReader asyncreader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);
         long numTrans = 0;
-        LogRecordWithDLSN record = Await.result(asyncreader.readNext());
+        LogRecordWithDLSN record = Utils.ioResult(asyncreader.readNext());
         while (null != record) {
             DLMTestUtil.verifyLogRecord(record);
             numTrans++;
             if (numTrans >= (txid - 1)) {
                 break;
             }
-            record = Await.result(asyncreader.readNext());
+            record = Utils.ioResult(asyncreader.readNext());
         }
         assertEquals((txid - 1), numTrans);
         Utils.close(asyncreader);
@@ -459,12 +463,12 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         dlm.close();
 
         URI uri = createDLMURI("/" + name);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
         assertTrue(namespace.logExists(name));
         assertFalse(namespace.logExists("non-existent-log"));
         URI nonExistentUri = createDLMURI("/" + "non-existent-ns");
-        DistributedLogNamespace nonExistentNS = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace nonExistentNS = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(nonExistentUri).build();
         assertFalse(nonExistentNS.logExists(name));
 
@@ -508,31 +512,31 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         SubscriptionsStore store = dlm.getSubscriptionsStore();
 
         // no data
-        assertEquals(Await.result(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound);
-        assertEquals(Await.result(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound);
-        assertEquals(Await.result(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound);
+        assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound);
+        assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound);
+        assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound);
         // empty
-        assertTrue(Await.result(store.getLastCommitPositions()).isEmpty());
+        assertTrue(Utils.ioResult(store.getLastCommitPositions()).isEmpty());
 
         // subscriber 0 advance
-        Await.result(store.advanceCommitPosition(subscriber0, commitPosition0));
-        assertEquals(commitPosition0, Await.result(store.getLastCommitPosition(subscriber0)));
-        Map<String, DLSN> committedPositions = Await.result(store.getLastCommitPositions());
+        Utils.ioResult(store.advanceCommitPosition(subscriber0, commitPosition0));
+        assertEquals(commitPosition0, Utils.ioResult(store.getLastCommitPosition(subscriber0)));
+        Map<String, DLSN> committedPositions = Utils.ioResult(store.getLastCommitPositions());
         assertEquals(1, committedPositions.size());
         assertEquals(commitPosition0, committedPositions.get(subscriber0));
 
         // subscriber 1 advance
-        Await.result(store.advanceCommitPosition(subscriber1, commitPosition1));
-        assertEquals(commitPosition1, Await.result(store.getLastCommitPosition(subscriber1)));
-        committedPositions = Await.result(store.getLastCommitPositions());
+        Utils.ioResult(store.advanceCommitPosition(subscriber1, commitPosition1));
+        assertEquals(commitPosition1, Utils.ioResult(store.getLastCommitPosition(subscriber1)));
+        committedPositions = Utils.ioResult(store.getLastCommitPositions());
         assertEquals(2, committedPositions.size());
         assertEquals(commitPosition0, committedPositions.get(subscriber0));
         assertEquals(commitPosition1, committedPositions.get(subscriber1));
 
         // subscriber 2 advance
-        Await.result(store.advanceCommitPosition(subscriber2, commitPosition2));
-        assertEquals(commitPosition2, Await.result(store.getLastCommitPosition(subscriber2)));
-        committedPositions = Await.result(store.getLastCommitPositions());
+        Utils.ioResult(store.advanceCommitPosition(subscriber2, commitPosition2));
+        assertEquals(commitPosition2, Utils.ioResult(store.getLastCommitPosition(subscriber2)));
+        committedPositions = Utils.ioResult(store.getLastCommitPositions());
         assertEquals(3, committedPositions.size());
         assertEquals(commitPosition0, committedPositions.get(subscriber0));
         assertEquals(commitPosition1, committedPositions.get(subscriber1));
@@ -541,11 +545,11 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         // subscriber 2 advance again
         DistributedLogManager newDLM = createNewDLM(conf, name);
         SubscriptionsStore newStore = newDLM.getSubscriptionsStore();
-        Await.result(newStore.advanceCommitPosition(subscriber2, commitPosition3));
+        Utils.ioResult(newStore.advanceCommitPosition(subscriber2, commitPosition3));
         newStore.close();
         newDLM.close();
 
-        committedPositions = Await.result(store.getLastCommitPositions());
+        committedPositions = Utils.ioResult(store.getLastCommitPositions());
         assertEquals(3, committedPositions.size());
         assertEquals(commitPosition0, committedPositions.get(subscriber0));
         assertEquals(commitPosition1, committedPositions.get(subscriber1));
@@ -570,13 +574,13 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
                 BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
                 assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
                                                                     perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-                FutureUtils.result(blplm.asyncClose());
+                Utils.ioResult(blplm.asyncClose());
             } else {
                 writer.markEndOfStream();
                 BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
                 assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, DistributedLogConstants.MAX_TXID,
                                                                     perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-                FutureUtils.result(blplm.asyncClose());
+                Utils.ioResult(blplm.asyncClose());
             }
         }
         return txid;
@@ -698,8 +702,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
     @Test(timeout = 60000, expected = LogRecordTooLongException.class)
     public void testMaxLogRecSize() throws Exception {
         DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize");
-        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
-        FutureUtils.result(writer.write(new LogRecord(1L, DLMTestUtil.repeatString(
+        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+        Utils.ioResult(writer.write(new LogRecord(1L, DLMTestUtil.repeatString(
                                 DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes())));
     }
 
@@ -710,21 +714,21 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(1024 * 1024);
         BKDistributedLogManager dlm =
                 createNewDLM(confLocal, "distrlog-transmissionSize");
-        AsyncLogWriter out = FutureUtils.result(dlm.openAsyncLogWriter());
+        AsyncLogWriter out = Utils.ioResult(dlm.openAsyncLogWriter());
         boolean exceptionEncountered = false;
         byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2];
         RAND.nextBytes(largePayload);
         try {
             LogRecord op = new LogRecord(1L, largePayload);
-            Future<DLSN> firstWriteFuture = out.write(op);
+            CompletableFuture<DLSN> firstWriteFuture = out.write(op);
             op = new LogRecord(2L, largePayload);
             // the second write will flush the first one, since we reached the maximum transmission size.
             out.write(op);
-            FutureUtils.result(firstWriteFuture);
+            Utils.ioResult(firstWriteFuture);
         } catch (LogRecordTooLongException exc) {
             exceptionEncountered = true;
         } finally {
-            FutureUtils.result(out.asyncClose());
+            Utils.ioResult(out.asyncClose());
         }
         assertFalse(exceptionEncountered);
         Abortables.abortQuietly(out);
@@ -750,7 +754,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
             assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
                                                                 perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-            FutureUtils.result(blplm.asyncClose());
+            Utils.ioResult(blplm.asyncClose());
         }
 
         LogReader reader = dlm.getInputStream(1);
@@ -819,7 +823,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             assertNotNull(
                 zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1,
                                                       writer.getLogSegmentSequenceNumber()), false));
-            FutureUtils.result(blplm.asyncClose());
+            Utils.ioResult(blplm.asyncClose());
         }
 
         BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
@@ -857,7 +861,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
 
         BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
-        FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
+        Utils.ioResult(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
         dlm.registerListener(new LogSegmentListener() {
             @Override
             public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
@@ -931,7 +935,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         for (int i = 0; i < 10; i++) {
             LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
             record.setControl();
-            Await.result(writer.writeControlRecord(record));
+            Utils.ioResult(writer.writeControlRecord(record));
         }
         LOG.info("10 control records are written");
 
@@ -946,14 +950,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         LOG.info("Completed first log segment");
 
         writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
-        Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
         LOG.info("Completed second log segment");
 
         LOG.info("Writing another 10 control records");
         for (int i = 1; i < 10; i++) {
             LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
             record.setControl();
-            Await.result(writer.write(record));
+            Utils.ioResult(writer.write(record));
         }
 
         assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN());
@@ -973,8 +977,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
         DLMTestUtil.generateCompletedLogSegments(dlm, conf, 2, 10);
 
-        Future<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN);
-        Long count = Await.result(futureCount, Duration.fromSeconds(2));
+        CompletableFuture<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN);
+        Long count = Utils.ioResult(futureCount, 2, TimeUnit.SECONDS);
         assertEquals(20, count.longValue());
 
         writer.close();
@@ -986,7 +990,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         String baseName = testNames.getMethodName();
         String streamName = "\0blah";
         URI uri = createDLMURI("/" + baseName);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
 
         DistributedLogManager dlm = null;
@@ -1036,15 +1040,15 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
             for (long j = 1; j <= 10; j++) {
                 LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                Future<DLSN> dlsn = writer.write(record);
+                CompletableFuture<DLSN> dlsn = writer.write(record);
 
                 if (i == 1 && j == 2) {
-                    truncDLSN = Await.result(dlsn);
+                    truncDLSN = Utils.ioResult(dlsn);
                 } else if (i == 2 && j == 3) {
-                    beyondTruncDLSN = Await.result(dlsn);
+                    beyondTruncDLSN = Utils.ioResult(dlsn);
                     beyondTruncTxId = record.getTransactionId();
                 } else if (j == 10) {
-                    Await.ready(dlsn);
+                    Utils.ioResult(dlsn);
                 }
             }
 
@@ -1065,7 +1069,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
 
         MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(
                 confLocal, metadataStore);
-        FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(1L)));
+        Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(1L)));
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
                 LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName()));
@@ -1088,7 +1092,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         }
 
         updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore);
-        FutureUtils.result(updater.setLogSegmentActive(segmentList.get(1L)));
+        Utils.ioResult(updater.setLogSegmentActive(segmentList.get(1L)));
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
                 LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName()));
@@ -1096,7 +1100,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         LOG.info("Read segments after marked first segment as active : {}", segmentList);
 
         updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore);
-        FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(2L)));
+        Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(2L)));
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
                 LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName()));
@@ -1109,7 +1113,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             boolean exceptionEncountered = false;
             try {
                 for (int i = 0; i < 3 * 10; i++) {
-                    LogRecordWithDLSN record = Await.result(reader.readNext());
+                    LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
                     DLMTestUtil.verifyLargeLogRecord(record);
                     assertEquals(expectedTxId, record.getTransactionId());
                     expectedTxId++;
@@ -1122,10 +1126,10 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         }
 
         updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        FutureUtils.result(updater.setLogSegmentActive(segmentList.get(2L)));
+        Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L)));
 
         BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        Assert.assertTrue(Await.result(writer.truncate(truncDLSN)));
+        Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
         BKLogWriteHandler handler = writer.getCachedWriteHandler();
         List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
         for (LogSegmentMetadata segment: cachedSegments) {
@@ -1164,7 +1168,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
 
         {
             AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             assertTrue(record != null);
             assertEquals(truncDLSN, record.getDlsn());
             Utils.close(reader);
@@ -1190,7 +1194,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
 
         {
             AsyncLogReader reader = dlm.getAsyncLogReader(beyondTruncDLSN);
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             assertTrue(record != null);
             assertEquals(beyondTruncDLSN, record.getDlsn());
             Utils.close(reader);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
index e0f2bab..2078a88 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
@@ -28,14 +28,17 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Sets;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -95,8 +98,8 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
         DistributedLogConfiguration newConf = new DistributedLogConfiguration();
         newConf.addConfiguration(conf);
         newConf.setCreateStreamIfNotExists(false);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(newConf).uri(uri).build();
+        Namespace namespace = NamespaceBuilder.newBuilder()
+            .conf(newConf).uri(uri).build();
         DistributedLogManager dlm = namespace.openLog(logName);
         LogWriter writer;
         try {
@@ -118,7 +121,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
         newConf.addConfiguration(conf);
         newConf.setCreateStreamIfNotExists(false);
         String streamName = "test-stream";
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(newConf).uri(uri).build();
         DistributedLogManager dlm = namespace.openLog(streamName);
         LogWriter writer;
@@ -148,7 +151,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
 
         URI uri = createDLMURI("/" + runtime.getMethodName());
 
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
 
         try {
@@ -225,7 +228,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
     public void testNamespaceListener() throws Exception {
         URI uri = createDLMURI("/" + runtime.getMethodName());
         zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
         final CountDownLatch[] latches = new CountDownLatch[3];
         for (int i = 0; i < 3; i++) {
@@ -268,7 +271,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
         newConf.addConfiguration(conf);
         newConf.setCreateStreamIfNotExists(true);
         newConf.setZkAclId(un);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(newConf).uri(uri).build();
         DistributedLogManager dlm = namespace.openLog(streamName);
         LogWriter writer = dlm.startLogSegmentNonPartitioned();
@@ -400,7 +403,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
         URI uri = createDLMURI("/" + runtime.getMethodName());
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
             .conf(conf)
             .uri(uri)
             .build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
index 854fd61..4915137 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
@@ -18,20 +18,21 @@
 package org.apache.distributedlog;
 
 import com.google.common.base.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogWriter;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.logsegment.LogSegmentFilter;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Await;
 
 import java.util.List;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 
-import com.twitter.util.TimeoutException;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -78,21 +79,21 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         DistributedLogManager dlm1 = createNewDLM(confLocal, dlName);
         long txid = 1;
 
-        ArrayList<Future<DLSN>> futures = new ArrayList<Future<DLSN>>(numEntriesPerSegment);
+        ArrayList<CompletableFuture<DLSN>> futures = new ArrayList<CompletableFuture<DLSN>>(numEntriesPerSegment);
         AsyncLogWriter out = dlm1.startAsyncLogSegmentNonPartitioned();
         for (int eid = 0; eid < numEntriesPerSegment; ++eid) {
             futures.add(out.write(DLMTestUtil.getLogRecordInstance(txid)));
             ++txid;
         }
-        FutureUtils.result(Future.collect(futures));
+        Utils.ioResult(FutureUtils.collect(futures));
         // commit
         LogRecord controlRecord = new LogRecord(txid, DistributedLogConstants.CONTROL_RECORD_CONTENT);
         controlRecord.setControl();
-        FutureUtils.result(out.write(controlRecord));
+        Utils.ioResult(out.write(controlRecord));
 
         DLSN last = dlm1.getLastDLSN();
         assertEquals(new DLSN(1,9,0), last);
-        DLSN first = Await.result(dlm1.getFirstDLSNAsync());
+        DLSN first = Utils.ioResult(dlm1.getFirstDLSNAsync());
         assertEquals(new DLSN(1,0,0), first);
         Utils.close(out);
     }
@@ -102,9 +103,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         String dlName = runtime.getMethodName();
         BKDistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = dlm.createReadHandler();
-        Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+        CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
         try {
-            Await.result(futureRecord);
+            Utils.ioResult(futureRecord);
             fail("should have thrown exception");
         } catch (LogNotFoundException ex) {
         }
@@ -116,9 +117,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         BKDistributedLogManager dlm = createNewDLM(conf, dlName);
         DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 3);
         BKLogReadHandler readHandler = dlm.createReadHandler();
-        Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+        CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
         try {
-            LogRecordWithDLSN record = Await.result(futureRecord);
+            LogRecordWithDLSN record = Utils.ioResult(futureRecord);
             assertEquals(new DLSN(1, 0, 0), record.getDlsn());
         } catch (Exception ex) {
             fail("should not have thrown exception: " + ex);
@@ -133,11 +134,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         BKLogReadHandler readHandler =
             ((BKDistributedLogManager) dlm).createReadHandler();
         AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0));
-        Boolean success = Await.result(futureSuccess);
+        CompletableFuture<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0));
+        Boolean success = Utils.ioResult(futureSuccess);
         assertTrue(success);
-        Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
-        LogRecordWithDLSN record = Await.result(futureRecord);
+        CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+        LogRecordWithDLSN record = Utils.ioResult(futureRecord);
         assertEquals(new DLSN(2, 0, 0), record.getDlsn());
     }
 
@@ -151,11 +152,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
 
         // Only truncates at ledger boundary.
-        Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0));
-        Boolean success = Await.result(futureSuccess);
+        CompletableFuture<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0));
+        Boolean success = Utils.ioResult(futureSuccess);
         assertTrue(success);
-        Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
-        LogRecordWithDLSN record = Await.result(futureRecord);
+        CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+        LogRecordWithDLSN record = Utils.ioResult(futureRecord);
         assertEquals(new DLSN(2, 0, 0), record.getDlsn());
     }
 
@@ -164,10 +165,10 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         String dlName = runtime.getMethodName();
         DistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN);
         try {
-            Await.result(count);
+            Utils.ioResult(count);
             fail("log is empty, should have returned log empty ex");
         } catch (LogNotFoundException ex) {
         }
@@ -179,9 +180,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         prepareLogSegmentsNonPartitioned(dlName, 11, 3);
         DistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN);
-        assertEquals(33, Await.result(count).longValue());
+        assertEquals(33, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -190,11 +191,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         prepareLogSegmentsNonPartitioned(dlName, 11, 3);
         DistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(2, 0, 0));
-        assertEquals(30, Await.result(count).longValue());
+        assertEquals(30, Utils.ioResult(count).longValue());
         count = readHandler.asyncGetLogRecordCount(new DLSN(3, 0, 0));
-        assertEquals(27, Await.result(count).longValue());
+        assertEquals(27, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -203,9 +204,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         prepareLogSegmentsNonPartitioned(dlName, 11, 3);
         DistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(12, 0, 0));
-        assertEquals(0, Await.result(count).longValue());
+        assertEquals(0, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -214,9 +215,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         prepareLogSegmentsNonPartitioned(dlName, 11, 3);
         DistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(11, 2, 0));
-        assertEquals(1, Await.result(count).longValue());
+        assertEquals(1, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -225,11 +226,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         prepareLogSegmentsNonPartitioned(dlName, 5, 10);
         DistributedLogManager dlm = createNewDLM(conf, dlName);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(3, 5, 0));
-        assertEquals(25, Await.result(count).longValue());
+        assertEquals(25, Utils.ioResult(count).longValue());
         count = readHandler.asyncGetLogRecordCount(new DLSN(2, 5, 0));
-        assertEquals(35, Await.result(count).longValue());
+        assertEquals(35, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -239,9 +240,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 5, txid);
         txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 0, 10, txid);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
-        assertEquals(15, Await.result(count).longValue());
+        assertEquals(15, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -251,9 +252,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 0, txid);
         txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 10, 0, txid);
         BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
-        assertEquals(0, Await.result(count).longValue());
+        assertEquals(0, Utils.ioResult(count).longValue());
     }
 
     @Test(timeout = 60000)
@@ -264,12 +265,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
         int txid = 1;
 
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
 
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+        List<LogSegmentMetadata> ledgerList = Utils.ioResult(
                 readHandler.readLogSegmentsFromStore(
                         LogSegmentMetadata.COMPARATOR,
                         LogSegmentFilter.DEFAULT_FILTER,
@@ -279,9 +280,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         assertEquals(1, ledgerList.size());
         assertTrue(ledgerList.get(0).isInProgress());
 
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
-        assertEquals(2, Await.result(count).longValue());
+        assertEquals(2, Utils.ioResult(count).longValue());
 
         Utils.close(out);
     }
@@ -294,12 +295,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         long txid = 1;
         txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, txid);
         AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
-        Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+        Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
 
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+        List<LogSegmentMetadata> ledgerList = Utils.ioResult(
                 readHandler.readLogSegmentsFromStore(
                         LogSegmentMetadata.COMPARATOR,
                         LogSegmentFilter.DEFAULT_FILTER,
@@ -309,9 +310,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         assertFalse(ledgerList.get(0).isInProgress());
         assertTrue(ledgerList.get(1).isInProgress());
 
-        Future<Long> count = null;
+        CompletableFuture<Long> count = null;
         count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
-        assertEquals(7, Await.result(count).longValue());
+        assertEquals(7, Utils.ioResult(count).longValue());
 
         Utils.close(out);
     }
@@ -322,14 +323,14 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
         try {
-            Await.result(readHandler.lockStream());
+            Utils.ioResult(readHandler.lockStream());
             fail("Should fail lock stream if log not found");
         } catch (LogNotFoundException ex) {
         }
 
         BKLogReadHandler subscriberReadHandler = bkdlm.createReadHandler(Optional.of("test-subscriber"));
         try {
-            Await.result(subscriberReadHandler.lockStream());
+            Utils.ioResult(subscriberReadHandler.lockStream());
             fail("Subscriber should fail lock stream if log not found");
         } catch (LogNotFoundException ex) {
             // expected
@@ -342,17 +343,17 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         BKDistributedLogManager bkdlm = createNewDLM(conf, streamName);
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1);
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        Await.result(readHandler.lockStream());
+        Utils.ioResult(readHandler.lockStream());
 
         // two subscribers could lock stream in parallel
         BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName);
         BKLogReadHandler s10Handler =
                 bkdlm10.createReadHandler(Optional.of("s1"));
-        Await.result(s10Handler.lockStream());
+        Utils.ioResult(s10Handler.lockStream());
         BKDistributedLogManager bkdlm20 = createNewDLM(conf, streamName);
         BKLogReadHandler s20Handler =
                 bkdlm20.createReadHandler(Optional.of("s2"));
-        Await.result(s20Handler.lockStream());
+        Utils.ioResult(s20Handler.lockStream());
 
         readHandler.asyncClose();
         bkdlm.close();
@@ -368,19 +369,19 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
         BKDistributedLogManager bkdlm = createNewDLM(conf, streamName);
         DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1);
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        Await.result(readHandler.lockStream());
+        Utils.ioResult(readHandler.lockStream());
 
         // same subscrbiers couldn't lock stream in parallel
         BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName);
         BKLogReadHandler s10Handler =
                 bkdlm10.createReadHandler(Optional.of("s1"));
-        Await.result(s10Handler.lockStream());
+        Utils.ioResult(s10Handler.lockStream());
 
         BKDistributedLogManager bkdlm11 = createNewDLM(conf, streamName);
         BKLogReadHandler s11Handler =
                 bkdlm11.createReadHandler(Optional.of("s1"));
         try {
-            Await.result(s11Handler.lockStream(), Duration.apply(10000, TimeUnit.MILLISECONDS));
+            Utils.ioResult(s11Handler.lockStream(), 10000, TimeUnit.MILLISECONDS);
             fail("Should fail lock stream using same subscriber id");
         } catch (OwnershipAcquireFailedException oafe) {
             // expected