You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:37 UTC

[05/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
index 9258922..da4ef81 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
@@ -18,8 +18,9 @@
 package org.apache.distributedlog;
 
 import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
-import com.twitter.util.Await;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +73,7 @@ public class TestSequenceID extends TestDistributedLogBase {
 
         BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name);
         BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        Await.result(writer.write(DLMTestUtil.getLogRecordInstance(0L)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0L)));
 
         dlm.close();
 
@@ -126,16 +127,16 @@ public class TestSequenceID extends TestDistributedLogBase {
         for (int i = 0; i < 3; i++) {
             BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
             for (int j = 0; j < 2; j++) {
-                Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+                Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
 
                 if (null == reader) {
                     reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN);
                     final AsyncLogReader r = reader;
-                    reader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+                    reader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
                         @Override
                         public void onSuccess(LogRecordWithDLSN record) {
                             readRecords.add(record);
-                            r.readNext().addEventListener(this);
+                            r.readNext().whenComplete(this);
                         }
 
                         @Override
@@ -149,7 +150,7 @@ public class TestSequenceID extends TestDistributedLogBase {
         }
 
         BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
 
         List<LogSegmentMetadata> segments = dlm.getLogSegments();
         assertEquals(4, segments.size());
@@ -174,12 +175,12 @@ public class TestSequenceID extends TestDistributedLogBase {
         for (int i = 0; i < 3; i++) {
             BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned();
             for (int j = 0; j < 2; j++) {
-                Await.result(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
+                Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
             }
             writerv5.closeAndComplete();
         }
         BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned();
-        Await.result(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
+        Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
 
         List<LogSegmentMetadata> segmentsv5 = dlmv5.getLogSegments();
         assertEquals(8, segmentsv5.size());
@@ -205,7 +206,7 @@ public class TestSequenceID extends TestDistributedLogBase {
         for (int i = 0; i < 3; i++) {
             BKAsyncLogWriter writerv4 = dlmv4.startAsyncLogSegmentNonPartitioned();
             for (int j = 0; j < 2; j++) {
-                Await.result(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++)));
+                Utils.ioResult(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++)));
             }
             writerv4.closeAndComplete();
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
index 5b26a70..06708c8 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.util.Utils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -31,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.distributedlog.LogSegmentMetadata.TruncationStatus;
-import com.twitter.util.Await;
 
 import static org.junit.Assert.*;
 
@@ -96,11 +97,11 @@ public class TestTruncate extends TestDistributedLogBase {
         AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
         long txid = 1 + 10 * 10;
         for (int j = 1; j <= 10; j++) {
-            Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
         }
 
         // to make sure the truncation task is executed
-        DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync());
+        DLSN lastDLSN = Utils.ioResult(dlm.getLastDLSNAsync());
         LOG.info("Get last dlsn of stream {} : {}", name, lastDLSN);
 
         assertEquals(6, distributedLogManager.getLogSegments().size());
@@ -123,20 +124,20 @@ public class TestTruncate extends TestDistributedLogBase {
         Thread.sleep(1000);
 
         // delete invalid dlsn
-        assertFalse(Await.result(pair.getRight().truncate(DLSN.InvalidDLSN)));
+        assertFalse(Utils.ioResult(pair.getRight().truncate(DLSN.InvalidDLSN)));
         verifyEntries(name, 1, 1, 5 * 10);
 
         for (int i = 1; i <= 4; i++) {
             int txn = (i-1) * 10 + i;
             DLSN dlsn = txid2DLSN.get((long)txn);
-            assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+            assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
             verifyEntries(name, 1, (i - 1) * 10 + 1, (5 - i + 1) * 10);
         }
 
         // Delete higher dlsn
         int txn = 43;
         DLSN dlsn = txid2DLSN.get((long) txn);
-        assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+        assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
         verifyEntries(name, 1, 41, 10);
 
         Utils.close(pair.getRight());
@@ -160,14 +161,14 @@ public class TestTruncate extends TestDistributedLogBase {
         for (int i = 1; i <= 4; i++) {
             int txn = (i-1) * 10 + i;
             DLSN dlsn = txid2DLSN.get((long)txn);
-            assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+            assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
             verifyEntries(name, 1, (i - 1) * 10 + 1, (5 - i + 1) * 10);
         }
 
         // Delete higher dlsn
         int txn = 43;
         DLSN dlsn = txid2DLSN.get((long) txn);
-        assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+        assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
         verifyEntries(name, 1, 41, 10);
 
         Utils.close(pair.getRight());
@@ -176,7 +177,7 @@ public class TestTruncate extends TestDistributedLogBase {
         // Try force truncation
         BKDistributedLogManager dlm = (BKDistributedLogManager)createNewDLM(confLocal, name);
         BKLogWriteHandler handler = dlm.createWriteHandler(true);
-        FutureUtils.result(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE));
+        Utils.ioResult(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE));
 
         verifyEntries(name, 1, 41, 10);
     }
@@ -230,11 +231,11 @@ public class TestTruncate extends TestDistributedLogBase {
         AsyncLogWriter newWriter = newDLM.startAsyncLogSegmentNonPartitioned();
         long txid = 1 + 4 * 10;
         for (int j = 1; j <= 10; j++) {
-            Await.result(newWriter.write(DLMTestUtil.getLogRecordInstance(txid++)));
+            Utils.ioResult(newWriter.write(DLMTestUtil.getLogRecordInstance(txid++)));
         }
 
         // to make sure the truncation task is executed
-        DLSN lastDLSN = Await.result(newDLM.getLastDLSNAsync());
+        DLSN lastDLSN = Utils.ioResult(newDLM.getLastDLSNAsync());
         LOG.info("Get last dlsn of stream {} : {}", name, lastDLSN);
 
         assertEquals(5, newDLM.getLogSegments().size());
@@ -277,7 +278,7 @@ public class TestTruncate extends TestDistributedLogBase {
 
         DistributedLogManager newDLM = createNewDLM(confLocal, name);
         AsyncLogWriter newWriter = newDLM.startAsyncLogSegmentNonPartitioned();
-        Await.result(newWriter.truncate(dlsnMap.get(15L)));
+        Utils.ioResult(newWriter.truncate(dlsnMap.get(15L)));
 
         List<LogSegmentMetadata> newSegments2 = newDLM.getLogSegments();
         assertArrayEquals(newSegments.toArray(new LogSegmentMetadata[4]),
@@ -299,7 +300,7 @@ public class TestTruncate extends TestDistributedLogBase {
             AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
             for (int j = 1; j <= numEntriesPerLogSegment; j++) {
                 long curTxId = txid++;
-                DLSN dlsn = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
+                DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
                 txid2DLSN.put(curTxId, dlsn);
             }
             Utils.close(writer);
@@ -311,7 +312,7 @@ public class TestTruncate extends TestDistributedLogBase {
             AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
             for (int j = 1; j <= 10; j++) {
                 long curTxId = txid++;
-                DLSN dlsn = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
+                DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
                 txid2DLSN.put(curTxId, dlsn);
             }
             return new ImmutablePair<DistributedLogManager, AsyncLogWriter>(dlm, writer);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
index c28437f..0d0ca99 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
@@ -18,7 +18,6 @@
 package org.apache.distributedlog;
 
 import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.util.PermitLimiter;
 import org.apache.distributedlog.util.SimplePermitLimiter;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.SettableFeature;
@@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import scala.runtime.BoxedUnit;
 
 public class TestWriteLimiter {
     static final Logger LOG = LoggerFactory.getLogger(TestWriteLimiter.class);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
index a1c075f..75bcda2 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
@@ -19,7 +19,7 @@ package org.apache.distributedlog;
 
 import org.apache.distributedlog.ZooKeeperClient.Credentials;
 import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.zookeeper.CreateMode;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
index 8d88a37..45fc1f3 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
@@ -17,20 +17,20 @@
  */
 package org.apache.distributedlog.acl;
 
+import java.net.URI;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.impl.acl.ZKAccessControl;
 import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.URI;
-
 import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.*;
 
@@ -60,14 +60,14 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
         ace.setDenyWrite(true);
         String zkPath = "/create-zk-access-control";
         ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
-        Await.result(zkac.create(zkc));
+        Utils.ioResult(zkac.create(zkc));
 
-        ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+        ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
         assertEquals(zkac, readZKAC);
 
         ZKAccessControl another = new ZKAccessControl(ace, zkPath);
         try {
-            Await.result(another.create(zkc));
+            FutureUtils.result(another.create(zkc));
         } catch (KeeperException.NodeExistsException ke) {
             // expected
         }
@@ -81,19 +81,19 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
         ace.setDenyDelete(true);
 
         ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
-        Await.result(zkac.create(zkc));
+        Utils.ioResult(zkac.create(zkc));
 
-        ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+        ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
         assertEquals(zkac, readZKAC);
 
-        Await.result(ZKAccessControl.delete(zkc, zkPath));
+        Utils.ioResult(ZKAccessControl.delete(zkc, zkPath));
 
         try {
-            Await.result(ZKAccessControl.read(zkc, zkPath, null));
+            FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null));
         } catch (KeeperException.NoNodeException nne) {
             // expected.
         }
-        Await.result(ZKAccessControl.delete(zkc, zkPath));
+        Utils.ioResult(ZKAccessControl.delete(zkc, zkPath));
     }
 
     @Test(timeout = 60000)
@@ -102,7 +102,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
 
         zkc.get().create(zkPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT);
 
-        ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+        ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
 
         assertEquals(zkPath, readZKAC.getZKPath());
         assertEquals(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY, readZKAC.getAccessControlEntry());
@@ -116,7 +116,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
         zkc.get().create(zkPath, "corrupted-data".getBytes(UTF_8), zkc.getDefaultACL(), CreateMode.PERSISTENT);
 
         try {
-            Await.result(ZKAccessControl.read(zkc, zkPath, null));
+            Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
         } catch (ZKAccessControl.CorruptedAccessControlException cace) {
             // expected
         }
@@ -130,25 +130,25 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
         ace.setDenyDelete(true);
 
         ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
-        Await.result(zkac.create(zkc));
+        Utils.ioResult(zkac.create(zkc));
 
-        ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+        ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
         assertEquals(zkac, readZKAC);
 
         ace.setDenyRelease(true);
         ZKAccessControl newZKAC = new ZKAccessControl(ace, zkPath);
-        Await.result(newZKAC.update(zkc));
-        ZKAccessControl readZKAC2 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+        Utils.ioResult(newZKAC.update(zkc));
+        ZKAccessControl readZKAC2 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
         assertEquals(newZKAC, readZKAC2);
 
         try {
-            Await.result(readZKAC.update(zkc));
+            FutureUtils.result(readZKAC.update(zkc));
         } catch (KeeperException.BadVersionException bve) {
             // expected
         }
         readZKAC2.getAccessControlEntry().setDenyTruncate(true);
-        Await.result(readZKAC2.update(zkc));
-        ZKAccessControl readZKAC3 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+        Utils.ioResult(readZKAC2.update(zkc));
+        ZKAccessControl readZKAC3 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
         assertEquals(readZKAC2, readZKAC3);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
index 19c301b..868549e 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
@@ -25,7 +25,7 @@ import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.impl.acl.ZKAccessControl;
 import org.apache.distributedlog.impl.acl.ZKAccessControlManager;
 import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
+import org.apache.distributedlog.util.Utils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -136,7 +136,7 @@ public class TestZKAccessControlManager extends ZooKeeperClusterTestCase {
             verifyStreamPermissions(zkcm, stream2, true, false, true, true, true);
 
             // delete stream2
-            Await.result(ZKAccessControl.delete(zkc, zkPath2));
+            Utils.ioResult(ZKAccessControl.delete(zkc, zkPath2));
             logger.info("Delete ACL for stream {}", stream2);
             while (!zkcm.allowTruncate(stream2)) {
                 Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
index 4f968b6..8a2c476 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
@@ -17,21 +17,20 @@
  */
 package org.apache.distributedlog.admin;
 
-import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.After;
@@ -105,7 +104,7 @@ public class TestDLCK extends TestDistributedLogBase {
         confLocal.setLogSegmentCacheEnabled(false);
         URI uri = createDLMURI("/check-and-repair-dl-namespace");
         zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(confLocal)
                 .uri(uri)
                 .build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
index f911f15..f7f859c 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
@@ -18,14 +18,14 @@
 package org.apache.distributedlog.admin;
 
 import java.net.URI;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
 import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -36,19 +36,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
 
 import static org.junit.Assert.*;
 
@@ -92,11 +89,11 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
 
         URI uri = createDLMURI("/change-sequence-number");
         zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(confLocal)
                 .uri(uri)
                 .build();
-        DistributedLogNamespace readNamespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace readNamespace = NamespaceBuilder.newBuilder()
                 .conf(readConf)
                 .uri(uri)
                 .build();
@@ -117,7 +114,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
         long expectedTxId = 1L;
         DLSN lastDLSN = DLSN.InitialDLSN;
         for (int i = 0; i < 4 * 10; i++) {
-            LogRecordWithDLSN record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
             assertNotNull(record);
             DLMTestUtil.verifyLogRecord(record);
             assertEquals(expectedTxId, record.getTransactionId());
@@ -133,9 +130,9 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
         LOG.info("Injected bad log segment '3'");
 
         // there isn't records should be read
-        Future<LogRecordWithDLSN> readFuture = reader.readNext();
+        CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();
         try {
-            LogRecordWithDLSN record = Await.result(readFuture);
+            LogRecordWithDLSN record = Utils.ioResult(readFuture);
             fail("Should fail reading next record "
                     + record
                     + " when there is a corrupted log segment");
@@ -151,7 +148,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
 
         try {
             reader = readDLM.getAsyncLogReader(lastDLSN);
-            Await.result(reader.readNext());
+            Utils.ioResult(reader.readNext());
             fail("Should fail reading next when there is a corrupted log segment");
         } catch (UnexpectedException ue) {
             // expected
@@ -166,18 +163,18 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
         // be able to read more after fix
         reader = readDLM.getAsyncLogReader(lastDLSN);
         // skip the first record
-        Await.result(reader.readNext());
+        Utils.ioResult(reader.readNext());
         readFuture = reader.readNext();
 
         expectedTxId = 51L;
-        LogRecord record = Await.result(readFuture);
+        LogRecord record = Utils.ioResult(readFuture);
         assertNotNull(record);
         DLMTestUtil.verifyLogRecord(record);
         assertEquals(expectedTxId, record.getTransactionId());
         expectedTxId++;
 
         for (int i = 1; i < 10; i++) {
-            record = Await.result(reader.readNext());
+            record = Utils.ioResult(reader.readNext());
             assertNotNull(record);
             DLMTestUtil.verifyLogRecord(record);
             assertEquals(expectedTxId, record.getTransactionId());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index 2492c06..925cad5 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -17,22 +17,21 @@
  */
 package org.apache.distributedlog.bk;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
 import org.apache.distributedlog.bk.SimpleLedgerAllocator.AllocationException;
 import org.apache.distributedlog.bk.SimpleLedgerAllocator.Phase;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.DefaultZKOp;
 import org.apache.distributedlog.zk.ZKTransaction;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -53,7 +52,6 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -116,13 +114,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         return new ZKTransaction(zkc);
     }
 
-    private SimpleLedgerAllocator createAllocator(String allocationPath) throws IOException {
+    private SimpleLedgerAllocator createAllocator(String allocationPath) throws Exception {
         return createAllocator(allocationPath, dlConf);
     }
 
     private SimpleLedgerAllocator createAllocator(String allocationPath,
-                                                  DistributedLogConfiguration conf) throws IOException {
-        return FutureUtils.result(SimpleLedgerAllocator.of(allocationPath, null, newQuorumConfigProvider(conf), zkc, bkc));
+                                                  DistributedLogConfiguration conf) throws Exception {
+        return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null, newQuorumConfigProvider(conf), zkc, bkc));
     }
 
     /**
@@ -136,13 +134,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         SimpleLedgerAllocator allocator = createAllocator(allocationPath);
         allocator.allocate();
         ZKTransaction txn = newTxn();
-        LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
         logger.info("Try obtaining ledger handle {}", lh.getId());
         byte[] data = zkc.get().getData(allocationPath, false, null);
         assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
         txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
         try {
-            FutureUtils.result(txn.execute());
+            Utils.ioResult(txn.execute());
             fail("Should fail the transaction when setting unexisted path");
         } catch (ZKException ke) {
             // expected
@@ -154,9 +152,9 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         // Create new transaction to obtain the ledger again.
         txn = newTxn();
         // we could obtain the ledger if it was obtained
-        LedgerHandle newLh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+        LedgerHandle newLh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
         assertEquals(lh.getId(), newLh.getId());
-        FutureUtils.result(txn.execute());
+        Utils.ioResult(txn.execute());
         data = zkc.get().getData(allocationPath, false, null);
         assertEquals(0, data.length);
         Utils.close(allocator);
@@ -177,16 +175,16 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         allocator1.allocate();
         // wait until allocated
         ZKTransaction txn1 = newTxn();
-        LedgerHandle lh = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
+        LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
         allocator2.allocate();
         ZKTransaction txn2 = newTxn();
         try {
-            FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER));
+            Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
             fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
-        } catch (ZKException zke) {
-            assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
+        } catch (ZKException ke) {
+            assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode());
         }
-        FutureUtils.result(txn1.execute());
+        Utils.ioResult(txn1.execute());
         Utils.close(allocator1);
         Utils.close(allocator2);
 
@@ -217,7 +215,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         ZKTransaction txn1 = newTxn();
 
         try {
-            FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
+            Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
             fail("Should fail allocating ledger if there aren't enough bookies");
         } catch (AllocationException ioe) {
             // expected
@@ -241,7 +239,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         allocator1.allocate();
         // wait until allocated
         ZKTransaction txn1 = newTxn();
-        LedgerHandle lh1 = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
+        LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
 
         // Second allocator kicks in
         stat = new Stat();
@@ -252,16 +250,16 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         allocator2.allocate();
         // wait until allocated
         ZKTransaction txn2 = newTxn();
-        LedgerHandle lh2 = FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER));
+        LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
 
         // should fail to commit txn1 as version is changed by second allocator
         try {
-            FutureUtils.result(txn1.execute());
+            Utils.ioResult(txn1.execute());
             fail("Should fail commit obtaining ledger handle from first allocator as allocator is modified by second allocator.");
         } catch (ZKException ke) {
             // as expected
         }
-        FutureUtils.result(txn2.execute());
+        Utils.ioResult(txn2.execute());
         Utils.close(allocator1);
         Utils.close(allocator2);
 
@@ -298,7 +296,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         allocator.allocate();
         ZKTransaction txn = newTxn();
         // close during obtaining ledger.
-        LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
         Utils.close(allocator);
         byte[] data = zkc.get().getData(allocationPath, false, null);
         assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
@@ -319,8 +317,8 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         allocator.allocate();
         ZKTransaction txn = newTxn();
         // close during obtaining ledger.
-        LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
-        FutureUtils.result(txn.execute());
+        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+        Utils.ioResult(txn.execute());
         Utils.close(allocator);
         byte[] data = zkc.get().getData(allocationPath, false, null);
         assertEquals(0, data.length);
@@ -336,10 +334,10 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         allocator.allocate();
         ZKTransaction txn = newTxn();
         // close during obtaining ledger.
-        LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
         txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
         try {
-            FutureUtils.result(txn.execute());
+            Utils.ioResult(txn.execute());
             fail("Should fail the transaction when setting unexisted path");
         } catch (ZKException ke) {
             // expected
@@ -358,13 +356,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         SimpleLedgerAllocator allocator = createAllocator(allcationPath);
         allocator.allocate();
         ZKTransaction txn1 = newTxn();
-        Future<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
+        CompletableFuture<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
         ZKTransaction txn2 = newTxn();
-        Future<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
-        assertTrue(obtainFuture2.isDefined());
-        assertTrue(obtainFuture2.isThrow());
+        CompletableFuture<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
+        assertTrue(obtainFuture2.isDone());
+        assertTrue(obtainFuture2.isCompletedExceptionally());
         try {
-            FutureUtils.result(obtainFuture2);
+            Utils.ioResult(obtainFuture2);
             fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle");
         } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) {
             // expected
@@ -380,8 +378,8 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         for (int i = 0; i < numLedgers; i++) {
             allocator.allocate();
             ZKTransaction txn = newTxn();
-            LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
-            FutureUtils.result(txn.execute());
+            LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+            Utils.ioResult(txn.execute());
             allocatedLedgers.add(lh);
         }
         assertEquals(numLedgers, allocatedLedgers.size());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
index e1aaa0b..a42d688 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
@@ -24,7 +24,6 @@ import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.ZKTransaction;
@@ -127,7 +126,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
         for (int i = 0; i < numAllocators; i++) {
             try {
                 pool.allocate();
-                FutureUtils.result(pool.tryObtain(newTxn(), NULL_LISTENER));
+                Utils.ioResult(pool.tryObtain(newTxn(), NULL_LISTENER));
                 fail("Should fail to allocate ledger if there are enought bookies");
             } catch (SimpleLedgerAllocator.AllocationException ae) {
                 assertEquals(SimpleLedgerAllocator.Phase.ERROR, ae.getPhase());
@@ -136,7 +135,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
         for (int i = 0; i < numAllocators; i++) {
             try {
                 pool.allocate();
-                FutureUtils.result(pool.tryObtain(newTxn(), NULL_LISTENER));
+                Utils.ioResult(pool.tryObtain(newTxn(), NULL_LISTENER));
                 fail("Should fail to allocate ledger if there aren't available allocators");
             } catch (SimpleLedgerAllocator.AllocationException ae) {
                 assertEquals(SimpleLedgerAllocator.Phase.ERROR, ae.getPhase());
@@ -159,7 +158,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
         for (int i = 0; i < numAllocators; i++) {
             ZKTransaction txn = newTxn();
             pool.allocate();
-            LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
+            LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
 
             // get the corresponding ledger allocator
             SimpleLedgerAllocator sla = pool.getLedgerAllocator(lh);
@@ -176,7 +175,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
             String slaPath = allocatePaths.get(i);
 
             // execute the transaction to confirm/abort obtain
-            FutureUtils.result(txn.execute());
+            Utils.ioResult(txn.execute());
 
             // introduce error to individual ledger allocator
             byte[] data = zkc.get().getData(slaPath, false, new Stat());
@@ -188,7 +187,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
             try {
                 pool.allocate();
                 ZKTransaction txn = newTxn();
-                LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
+                LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
 
                 // get the corresponding ledger allocator
                 SimpleLedgerAllocator sla = pool.getLedgerAllocator(lh);
@@ -197,7 +196,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
                 logger.info("Allocated ledger {} from path {}", lh.getId(), slaPath);
                 allocatedPathSet.add(slaPath);
 
-                FutureUtils.result(txn.execute());
+                Utils.ioResult(txn.execute());
                 ++numSuccess;
             } catch (IOException ioe) {
                 // continue
@@ -229,7 +228,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
         LedgerAllocatorPool pool = new LedgerAllocatorPool(allocationPath, 0, dlConf, zkc, bkc, allocationExecutor);
         ZKTransaction txn = newTxn();
         try {
-            FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
+            Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
             fail("Should fail obtain ledger handle if there is no allocator.");
         } catch (SimpleLedgerAllocator.AllocationException ae) {
             fail("Should fail obtain ledger handle if there is no allocator.");
@@ -251,8 +250,8 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
         for (int i = 0; i < numLedgers; i++) {
             pool.allocate();
             ZKTransaction txn = newTxn();
-            LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
-            FutureUtils.result(txn.execute());
+            LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
+            Utils.ioResult(txn.execute());
             allocatedLedgers.add(lh);
         }
         assertEquals(numLedgers, allocatedLedgers.size());
@@ -280,8 +279,8 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
                         for (int i = 0; i < numLedgers; i++) {
                             pool.allocate();
                             ZKTransaction txn = newTxn();
-                            LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
-                            FutureUtils.result(txn.execute());
+                            LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
+                            Utils.ioResult(txn.execute());
                             lh.close();
                             allocatedLedgers.putIfAbsent(lh.getId(), lh);
                             logger.info("[thread {}] allocate {}th ledger {}",

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
index f371007..5efa7e4 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,10 +20,12 @@ package org.apache.distributedlog.config;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Properties;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Writer to write properties to files.
+ */
 public class PropertiesWriter {
     static final Logger LOG = LoggerFactory.getLogger(PropertiesWriter.class);
 
@@ -57,7 +59,7 @@ public class PropertiesWriter {
     public void save() throws Exception {
         FileOutputStream outputStream = new FileOutputStream(configFile);
         properties.store(outputStream, null);
-        configFile.setLastModified(configFile.lastModified()+1000);
+        configFile.setLastModified(configFile.lastModified() + 1000);
         LOG.debug("save modified={}", configFile.lastModified());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java
deleted file mode 100644
index 9563511..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-public class TestConcurrentBaseConfiguration {
-    static final Logger LOG = LoggerFactory.getLogger(TestConcurrentBaseConfiguration.class);
-
-    @Test(timeout = 20000)
-    public void testBasicOperations() throws Exception {
-        ConcurrentBaseConfiguration conf = new ConcurrentBaseConfiguration();
-        conf.setProperty("prop1", "1");
-        assertEquals(1, conf.getInt("prop1"));
-        conf.setProperty("prop1", "2");
-        assertEquals(2, conf.getInt("prop1"));
-        conf.clearProperty("prop1");
-        assertEquals(null, conf.getInteger("prop1", null));
-        conf.setProperty("prop1", "1");
-        conf.setProperty("prop2", "2");
-        assertEquals(1, conf.getInt("prop1"));
-        assertEquals(2, conf.getInt("prop2"));
-        conf.clearProperty("prop1");
-        assertEquals(null, conf.getInteger("prop1", null));
-        assertEquals(2, conf.getInt("prop2"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java
deleted file mode 100644
index 8420a97..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.commons.configuration.event.ConfigurationEvent;
-import org.apache.commons.configuration.event.ConfigurationListener;
-import org.jmock.lib.concurrent.DeterministicScheduler;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-
-/**
- * Notes:
- * 1. lastModified granularity is platform dependent, generally 1 sec, so we can't wait 1ms for things to
- * get picked up.
- */
-public class TestConfigurationSubscription {
-    static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class);
-
-    /**
-     * Give FileChangedReloadingStrategy some time to start reloading
-     * Make sure now!=lastChecked
-     * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
-     */
-    private void ensureConfigReloaded() throws InterruptedException {
-        // sleep 1 ms so that System.currentTimeMillis() !=
-        // lastChecked (the time we construct FileChangedReloadingStrategy
-        Thread.sleep(1);
-    }
-
-    @Test(timeout = 60000)
-    public void testReloadConfiguration() throws Exception {
-        PropertiesWriter writer = new PropertiesWriter();
-        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
-        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
-        DeterministicScheduler executorService = new DeterministicScheduler();
-        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
-        ConfigurationSubscription confSub =
-                new ConfigurationSubscription(conf, fileConfigBuilders, executorService, 100, TimeUnit.MILLISECONDS);
-        final AtomicReference<ConcurrentBaseConfiguration> confHolder = new AtomicReference<>();
-        confSub.registerListener(new org.apache.distributedlog.config.ConfigurationListener() {
-            @Override
-            public void onReload(ConcurrentBaseConfiguration conf) {
-                confHolder.set(conf);
-            }
-        });
-        assertEquals(null, conf.getProperty("prop1"));
-
-        // add
-        writer.setProperty("prop1", "1");
-        writer.save();
-        // ensure the file change reloading event can be triggered
-        ensureConfigReloaded();
-        // reload the config
-        confSub.reload();
-        assertNotNull(confHolder.get());
-        assertTrue(conf == confHolder.get());
-        assertEquals("1", conf.getProperty("prop1"));
-    }
-
-    @Test(timeout = 60000)
-    public void testAddReloadBasicsConfig() throws Exception {
-        PropertiesWriter writer = new PropertiesWriter();
-        DeterministicScheduler mockScheduler = new DeterministicScheduler();
-        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
-        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
-        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
-        ConfigurationSubscription confSub =
-                new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
-        assertEquals(null, conf.getProperty("prop1"));
-
-        // add
-        writer.setProperty("prop1", "1");
-        writer.save();
-        // ensure the file change reloading event can be triggered
-        ensureConfigReloaded();
-        mockScheduler.tick(100, TimeUnit.MILLISECONDS);
-        assertEquals("1", conf.getProperty("prop1"));
-
-    }
-
-    @Test(timeout = 60000)
-    public void testInitialConfigLoad() throws Exception {
-        PropertiesWriter writer = new PropertiesWriter();
-        writer.setProperty("prop1", "1");
-        writer.setProperty("prop2", "abc");
-        writer.setProperty("prop3", "123.0");
-        writer.setProperty("prop4", "11132");
-        writer.setProperty("prop5", "true");
-        writer.save();
-
-        ScheduledExecutorService mockScheduler = new DeterministicScheduler();
-        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
-        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
-        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
-        ConfigurationSubscription confSub =
-                new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
-        assertEquals(1, conf.getInt("prop1"));
-        assertEquals("abc", conf.getString("prop2"));
-        assertEquals(123.0, conf.getFloat("prop3"), 0);
-        assertEquals(11132, conf.getInt("prop4"));
-        assertEquals(true, conf.getBoolean("prop5"));
-    }
-
-    @Test(timeout = 60000)
-    public void testExceptionInConfigLoad() throws Exception {
-        PropertiesWriter writer = new PropertiesWriter();
-        writer.setProperty("prop1", "1");
-        writer.save();
-
-        DeterministicScheduler mockScheduler = new DeterministicScheduler();
-        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
-        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
-        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
-        ConfigurationSubscription confSub =
-                new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
-
-        final AtomicInteger count = new AtomicInteger(1);
-        conf.addConfigurationListener(new ConfigurationListener() {
-            @Override
-            public void configurationChanged(ConfigurationEvent event) {
-                LOG.info("config changed {}", event);
-                // Throw after so we actually see the update anyway.
-                if (!event.isBeforeUpdate()) {
-                    count.getAndIncrement();
-                    throw new RuntimeException("config listener threw and exception");
-                }
-            }
-        });
-
-        int i = 0;
-        int initial = 0;
-        while (count.get() == initial) {
-            writer.setProperty("prop1", Integer.toString(i++));
-            writer.save();
-            mockScheduler.tick(100, TimeUnit.MILLISECONDS);
-        }
-
-        initial = count.get();
-        while (count.get() == initial) {
-            writer.setProperty("prop1", Integer.toString(i++));
-            writer.save();
-            mockScheduler.tick(100, TimeUnit.MILLISECONDS);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
index b5d6300..21aa1c9 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
@@ -23,12 +23,13 @@ import com.google.common.base.Optional;
 import org.apache.distributedlog.DistributedLogConfiguration;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.PropertiesWriter;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
index c1ac98a..2731af3 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.distributedlog.config;
 import org.apache.distributedlog.DistributedLogConfiguration;
 
 import org.apache.distributedlog.bk.QuorumConfig;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
 import org.junit.Test;
 
 import static org.apache.distributedlog.DistributedLogConfiguration.*;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
index 3ce4952..1064a6f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.feature;
 
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.SettableFeature;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
index 5d4472d..f8dd245 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
@@ -18,8 +18,8 @@
 package org.apache.distributedlog.feature;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
-import org.apache.distributedlog.config.PropertiesWriter;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.config.PropertiesWriter;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Ignore;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
index b2fcbf6..db9fb31 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
@@ -23,7 +23,6 @@ import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
@@ -89,12 +88,12 @@ public class TestZKLogMetadataStore extends TestDistributedLogBase {
 
     @Test(timeout = 60000)
     public void testCreateLog() throws Exception {
-        assertEquals(uri, FutureUtils.result(metadataStore.createLog("test")));
+        assertEquals(uri, Utils.ioResult(metadataStore.createLog("test")));
     }
 
     @Test(timeout = 60000)
     public void testGetLogLocation() throws Exception {
-        Optional<URI> uriOptional = FutureUtils.result(metadataStore.getLogLocation("test"));
+        Optional<URI> uriOptional = Utils.ioResult(metadataStore.getLogLocation("test"));
         assertTrue(uriOptional.isPresent());
         assertEquals(uri, uriOptional.get());
     }
@@ -107,7 +106,7 @@ public class TestZKLogMetadataStore extends TestDistributedLogBase {
             logs.add(logName);
             createLogInNamespace(uri, logName);
         }
-        Set<String> result = Sets.newHashSet(FutureUtils.result(metadataStore.getLogs()));
+        Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs()));
         assertEquals(10, result.size());
         assertTrue(Sets.difference(logs, result).isEmpty());
     }