You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:35 UTC

[03/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index 4d4a008..5189104 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog.lock;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -27,11 +28,10 @@ import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.lock.ZKSessionLock.State;
 import org.apache.distributedlog.util.FailpointUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Await;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -43,7 +43,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -149,9 +148,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
         String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
 
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node1)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node2)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node3)));
 
         // Bad Lock Node Name
         String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
@@ -160,15 +159,15 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
         String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
 
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
-        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node4)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node5)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node6)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node7)));
+        assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node8)));
 
         // Malformed Node Name
         String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
-        assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
+        assertEquals(Pair.of("malformed", 12345678L), Utils.ioResult(asyncParseClientID(zk, lockPath, node9)));
     }
 
     @Test(timeout = 60000)
@@ -256,7 +255,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals("counter should not be increased in different epochs", 1, counter.get());
 
         // lock action would not be executed in same epoch and promise would be satisfied with exception
-        Promise<BoxedUnit> promise = new Promise<BoxedUnit>();
+        CompletableFuture<Void> promise = new CompletableFuture<Void>();
         lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
             @Override
             public void execute() {
@@ -269,7 +268,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
             }
         }, promise);
         try {
-            Await.result(promise);
+            Utils.ioResult(promise);
             fail("Should satisfy promise with epoch changed exception.");
         } catch (EpochChangedException ece) {
             // expected
@@ -457,7 +456,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock.getLockState());
         List<String> children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+        assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
 
         // lock should fail on a success lock
         try {
@@ -469,7 +468,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock.getLockState());
         children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+        assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
 
         // unlock
         lock.unlock();
@@ -546,7 +545,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock0.getLockState());
         List<String> children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
         try {
             lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
@@ -559,7 +558,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLOSED, lock1.getLockState());
         children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
         lock0.unlock();
         // verification after unlock lock0
@@ -574,7 +573,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock2.getLockState());
         children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock2.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+        assertEquals(lock2.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
 
         lock2.unlock();
     }
@@ -649,7 +648,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock0.getLockState());
         List<String> children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
         final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
 
@@ -687,7 +686,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock1.getLockState());
         children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+        assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
 
         lock1.unlock();
     }
@@ -719,7 +718,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock.getLockState());
         List<String> children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+        assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
 
         ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
         expiredLatch.await();
@@ -806,7 +805,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock0.getLockState());
         List<String> children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
         final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
         final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
@@ -830,9 +829,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
 
         assertEquals(2, children.size());
         assertEquals(State.CLAIMED, lock0.getLockState());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
         awaitState(State.WAITING, lock1);
-        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+        assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
 
         // expire lock1
         ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
@@ -843,7 +842,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLOSED, lock1.getLockState());
         children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
     }
 
     public void awaitState(State state, ZKSessionLock lock) throws InterruptedException {
@@ -891,7 +890,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLOSED, lock1_0.getLockState());
         List<String> children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
         // lock1_1 would wait the ownership
         final ZKSessionLock lock1_1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
@@ -917,9 +916,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
 
         assertEquals(2, children.size());
         assertEquals(State.CLAIMED, lock0.getLockState());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
         awaitState(State.WAITING, lock1_1);
-        assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+        assertEquals(lock1_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
 
         if (isUnlock) {
             lock0.unlock();
@@ -938,7 +937,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         assertEquals(State.CLAIMED, lock1_1.getLockState());
         children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
-        assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+        assertEquals(lock1_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
 
         lock1_1.unlock();
     }
@@ -1040,9 +1039,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
 
         assertEquals(2, children.size());
         assertEquals(State.CLAIMED, lock0_0.getLockState());
-        assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
         awaitState(State.WAITING, lock1);
-        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+        assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
 
         final CountDownLatch lock0DoneLatch = new CountDownLatch(1);
         final AtomicReference<String> ownerFromLock0 = new AtomicReference<String>(null);
@@ -1058,9 +1057,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
             children = getLockWaiters(zkc, lockPath);
             assertEquals(2, children.size());
             assertEquals(State.CLAIMED, lock0_0.getLockState());
-            assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+            assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
             assertEquals(State.WAITING, lock1.getLockState());
-            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+            assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
         } else {
             lock0Thread = new Thread(new Runnable() {
                 @Override
@@ -1087,11 +1086,11 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
 
             assertEquals(3, children.size());
             assertEquals(State.CLAIMED, lock0_0.getLockState());
-            assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+            assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
             awaitState(State.WAITING, lock1);
-            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+            assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
             awaitState(State.WAITING, lock0_1);
-            assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(2))));
+            assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(2))));
         }
 
         if (isUnlock) {
@@ -1114,7 +1113,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
             children = getLockWaiters(zkc, lockPath);
             assertEquals(1, children.size());
             assertEquals(State.CLAIMED, lock1.getLockState());
-            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+            assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
         } else {
             assertNotNull(lock0Thread);
             if (!isUnlock) {
@@ -1128,14 +1127,14 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
                 children = getLockWaiters(zkc, lockPath);
                 assertEquals(1, children.size());
                 assertEquals(State.CLAIMED, lock1.getLockState());
-                assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+                assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
             } else {
                 children = getLockWaiters(zkc, lockPath);
                 assertEquals(2, children.size());
                 assertEquals(State.CLAIMED, lock1.getLockState());
-                assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+                assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
                 assertEquals(State.WAITING, lock0_1.getLockState());
-                assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
+                assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
             }
         }
 
@@ -1148,7 +1147,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
             children = getLockWaiters(zkc, lockPath);
             assertEquals(1, children.size());
             assertEquals(State.CLAIMED, lock0_1.getLockState());
-            assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+            assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
         }
     }
 
@@ -1186,15 +1185,15 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         List<String> children = getLockWaiters(zkc0, lockPath);
         assertEquals(1, children.size());
         assertEquals(State.CLAIMED, lock0.getLockState());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
         lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
         children = getLockWaiters(zkc0, lockPath);
         assertEquals(2, children.size());
         assertEquals(State.CLAIMED, lock0.getLockState());
-        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+        assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
         assertEquals(State.CLAIMED, lock1.getLockState());
-        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
+        assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
 
         if (isUnlock) {
             lock0.unlock();
@@ -1202,7 +1201,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
             children = getLockWaiters(zkc0, lockPath);
             assertEquals(1, children.size());
             assertEquals(State.CLAIMED, lock1.getLockState());
-            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+            assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
             lock1.unlock();
         } else {
             ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java b/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
index 5943b64..6687b7b 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.logsegment;
 
-import org.apache.distributedlog.util.Sizable;
+import org.apache.distributedlog.common.util.Sizable;
 import org.junit.Test;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
index e18fb3f..2090828 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
@@ -24,12 +24,11 @@ import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.ZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.After;
@@ -105,7 +104,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
 
         // Dryrun
         MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore);
-        FutureUtils.result(dryrunUpdater.changeSequenceNumber(segment, 6L));
+        Utils.ioResult(dryrunUpdater.changeSequenceNumber(segment, 6L));
 
         segmentList = readLogSegments(ledgerPath);
         assertEquals(5, segmentList.size());
@@ -113,7 +112,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
         // Fix the inprogress log segments
 
         MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        FutureUtils.result(updater.changeSequenceNumber(segment, 6L));
+        Utils.ioResult(updater.changeSequenceNumber(segment, 6L));
 
         segmentList = readLogSegments(ledgerPath);
         assertEquals(6, segmentList.size());
@@ -156,19 +155,19 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
         // Dryrun
         MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore);
         try {
-            FutureUtils.result(dryrunUpdater.updateLastRecord(completedLogSegment, badRecord));
+            Utils.ioResult(dryrunUpdater.updateLastRecord(completedLogSegment, badRecord));
             fail("Should fail on updating dlsn that in different log segment");
         } catch (IllegalArgumentException iae) {
             // expected
         }
         try {
-            FutureUtils.result(dryrunUpdater.updateLastRecord(inprogressLogSegment, goodRecord2));
+            Utils.ioResult(dryrunUpdater.updateLastRecord(inprogressLogSegment, goodRecord2));
             fail("Should fail on updating dlsn for an inprogress log segment");
         } catch (IllegalStateException ise) {
             // expected
         }
         LogSegmentMetadata updatedCompletedLogSegment =
-                FutureUtils.result(dryrunUpdater.updateLastRecord(completedLogSegment, goodRecord1));
+                Utils.ioResult(dryrunUpdater.updateLastRecord(completedLogSegment, goodRecord1));
         assertEquals(goodLastDLSN1, updatedCompletedLogSegment.getLastDLSN());
         assertEquals(goodRecord1.getTransactionId(), updatedCompletedLogSegment.getLastTxId());
         assertTrue(updatedCompletedLogSegment.isRecordLastPositioninThisSegment(goodRecord1));
@@ -187,18 +186,18 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
         // Fix the last dlsn
         MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
         try {
-            FutureUtils.result(updater.updateLastRecord(completedLogSegment, badRecord));
+            Utils.ioResult(updater.updateLastRecord(completedLogSegment, badRecord));
             fail("Should fail on updating dlsn that in different log segment");
         } catch (IllegalArgumentException iae) {
             // expected
         }
         try {
-            FutureUtils.result(updater.updateLastRecord(inprogressLogSegment, goodRecord2));
+            Utils.ioResult(updater.updateLastRecord(inprogressLogSegment, goodRecord2));
             fail("Should fail on updating dlsn for an inprogress log segment");
         } catch (IllegalStateException ise) {
             // expected
         }
-        updatedCompletedLogSegment = FutureUtils.result(updater.updateLastRecord(completedLogSegment, goodRecord1));
+        updatedCompletedLogSegment = Utils.ioResult(updater.updateLastRecord(completedLogSegment, goodRecord1));
         assertEquals(goodLastDLSN1, updatedCompletedLogSegment.getLastDLSN());
         assertEquals(goodRecord1.getTransactionId(), updatedCompletedLogSegment.getLastTxId());
         assertTrue(updatedCompletedLogSegment.isRecordLastPositioninThisSegment(goodRecord1));
@@ -245,28 +244,28 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
 
         // Dryrun
         MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore);
-        FutureUtils.result(dryrunUpdater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
+        Utils.ioResult(dryrunUpdater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
 
         segmentList = readLogSegments(ledgerPath);
         assertEquals(false, segmentList.get(segmentToModify).isTruncated());
 
         // change truncation for the 1st log segment
         MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
+        Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
 
         segmentList = readLogSegments(ledgerPath);
         assertEquals(true, segmentList.get(segmentToModify).isTruncated());
         assertEquals(false, segmentList.get(segmentToModify).isPartiallyTruncated());
 
         updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        FutureUtils.result(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
+        Utils.ioResult(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
 
         segmentList = readLogSegments(ledgerPath);
         assertEquals(false, segmentList.get(segmentToModify).isTruncated());
         assertEquals(false, segmentList.get(segmentToModify).isPartiallyTruncated());
 
         updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        FutureUtils.result(updater.setLogSegmentPartiallyTruncated(segmentList.get(segmentToModify),
+        Utils.ioResult(updater.setLogSegmentPartiallyTruncated(segmentList.get(segmentToModify),
                 segmentList.get(segmentToModify).getFirstDLSN()));
 
         segmentList = readLogSegments(ledgerPath);
@@ -274,7 +273,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
         assertEquals(true, segmentList.get(segmentToModify).isPartiallyTruncated());
 
         updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        FutureUtils.result(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
+        Utils.ioResult(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
 
         segmentList = readLogSegments(ledgerPath);
         assertEquals(false, segmentList.get(segmentToModify).isTruncated());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
deleted file mode 100644
index 46a3a6f..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.namespace;
-
-import org.apache.distributedlog.BKDistributedLogNamespace;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.TestDistributedLogBase;
-import org.junit.Test;
-
-import java.net.URI;
-
-import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test Namespace Builder
- */
-public class TestDistributedLogNamespaceBuilder extends TestDistributedLogBase {
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testEmptyBuilder() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder().build();
-    }
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testMissingUri() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .build();
-    }
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testMissingSchemeInUri() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("/test"))
-                .build();
-    }
-
-    @Test(timeout = 60000, expected = IllegalArgumentException.class)
-    public void testInvalidSchemeInUri() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("dist://invalid/scheme/in/uri"))
-                .build();
-    }
-
-    @Test(timeout = 60000, expected = IllegalArgumentException.class)
-    public void testInvalidSchemeCorrectBackendInUri() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("dist-bk://invalid/scheme/in/uri"))
-                .build();
-    }
-
-    @Test(timeout = 60000, expected = IllegalArgumentException.class)
-    public void testUnknownBackendInUri() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("distributedlog-unknown://invalid/scheme/in/uri"))
-                .build();
-    }
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testNullStatsLogger() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("distributedlog-bk://localhost/distributedlog"))
-                .statsLogger(null)
-                .build();
-    }
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testNullClientId() throws Exception {
-        DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("distributedlog-bk://localhost/distributedlog"))
-                .clientId(null)
-                .build();
-    }
-
-    @Test(timeout = 60000)
-    public void testBuildBKDistributedLogNamespace() throws Exception {
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"))
-                .build();
-        try {
-            assertTrue("distributedlog-bk:// should build bookkeeper based distributedlog namespace",
-                    namespace instanceof BKDistributedLogNamespace);
-        } finally {
-            namespace.close();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBuildWhenMissingBackendInUri() throws Exception {
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(new DistributedLogConfiguration())
-                .uri(new URI("distributedlog://" + zkServers + DLOG_NAMESPACE + "/defaultnamespace"))
-                .build();
-        try {
-            assertTrue("distributedlog:// should build bookkeeper based distributedlog namespace",
-                    namespace instanceof BKDistributedLogNamespace);
-        } finally {
-            namespace.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
new file mode 100644
index 0000000..89b4852
--- /dev/null
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.namespace;
+
+import org.apache.distributedlog.BKDistributedLogNamespace;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Namespace Builder
+ */
+public class TestNamespaceBuilder extends TestDistributedLogBase {
+
+    @Test(timeout = 60000, expected = NullPointerException.class)
+    public void testEmptyBuilder() throws Exception {
+        NamespaceBuilder.newBuilder().build();
+    }
+
+    @Test(timeout = 60000, expected = NullPointerException.class)
+    public void testMissingUri() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .build();
+    }
+
+    @Test(timeout = 60000, expected = NullPointerException.class)
+    public void testMissingSchemeInUri() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("/test"))
+                .build();
+    }
+
+    @Test(timeout = 60000, expected = IllegalArgumentException.class)
+    public void testInvalidSchemeInUri() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("dist://invalid/scheme/in/uri"))
+                .build();
+    }
+
+    @Test(timeout = 60000, expected = IllegalArgumentException.class)
+    public void testInvalidSchemeCorrectBackendInUri() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("dist-bk://invalid/scheme/in/uri"))
+                .build();
+    }
+
+    @Test(timeout = 60000, expected = IllegalArgumentException.class)
+    public void testUnknownBackendInUri() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("distributedlog-unknown://invalid/scheme/in/uri"))
+                .build();
+    }
+
+    @Test(timeout = 60000, expected = NullPointerException.class)
+    public void testNullStatsLogger() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("distributedlog-bk://localhost/distributedlog"))
+                .statsLogger(null)
+                .build();
+    }
+
+    @Test(timeout = 60000, expected = NullPointerException.class)
+    public void testNullClientId() throws Exception {
+        NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("distributedlog-bk://localhost/distributedlog"))
+                .clientId(null)
+                .build();
+    }
+
+    @Test(timeout = 60000)
+    public void testBuildBKDistributedLogNamespace() throws Exception {
+        Namespace namespace = NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"))
+                .build();
+        try {
+            assertTrue("distributedlog-bk:// should build bookkeeper based distributedlog namespace",
+                    namespace instanceof BKDistributedLogNamespace);
+        } finally {
+            namespace.close();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBuildWhenMissingBackendInUri() throws Exception {
+        Namespace namespace = NamespaceBuilder.newBuilder()
+                .conf(new DistributedLogConfiguration())
+                .uri(new URI("distributedlog://" + zkServers + DLOG_NAMESPACE + "/defaultnamespace"))
+                .build();
+        try {
+            assertTrue("distributedlog:// should build bookkeeper based distributedlog namespace",
+                    namespace instanceof BKDistributedLogNamespace);
+        } finally {
+            namespace.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java b/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java
deleted file mode 100644
index 8949bec..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.rate;
-
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.MockTimer;
-import com.twitter.util.Time$;
-import com.twitter.util.TimeControl;
-
-import org.junit.Test;
-import scala.runtime.BoxedUnit;
-
-import static org.junit.Assert.*;
-
-public class TestMovingAverageRate {
-    interface TcCallback {
-        void apply(TimeControl tc);
-    }
-
-    void withCurrentTimeFrozen(final TcCallback cb) {
-        Time$.MODULE$.withCurrentTimeFrozen(new Function<TimeControl, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(TimeControl time) {
-                cb.apply(time);
-                return BoxedUnit.UNIT;
-            }
-        });
-    }
-
-    private void advance(TimeControl time, MockTimer timer, int timeMs) {
-        Duration duration = Duration.fromMilliseconds(timeMs);
-        time.advance(duration);
-        timer.tick();
-    }
-
-    @Test(timeout = 60000)
-    public void testNoChangeInUnderMinInterval() {
-        withCurrentTimeFrozen(new TcCallback() {
-            @Override
-            public void apply(TimeControl time) {
-                MockTimer timer = new MockTimer();
-                MovingAverageRateFactory factory = new MovingAverageRateFactory(timer);
-                MovingAverageRate avg60 = factory.create(60);
-                avg60.add(1000);
-                assertEquals(0, avg60.get(), 0);
-                advance(time, timer, 1);
-                assertEquals(0, avg60.get(), 0);
-                advance(time, timer, 1);
-                assertEquals(0, avg60.get(), 0);
-            }
-        });
-    }
-
-    @Test(timeout = 60000)
-    public void testFactoryWithMultipleTimers() {
-        withCurrentTimeFrozen(new TcCallback() {
-            @Override
-            public void apply(TimeControl time) {
-                MockTimer timer = new MockTimer();
-                MovingAverageRateFactory factory = new MovingAverageRateFactory(timer);
-                MovingAverageRate avg60 = factory.create(60);
-                MovingAverageRate avg30 = factory.create(30);
-
-                // Can't test this precisely because the Rate class uses its own
-                // ticker. So we can control when it gets sampled but not the time
-                // value it uses. So, just do basic validation.
-                for (int i = 0; i < 30; i++) {
-                    avg60.add(100);
-                    avg30.add(100);
-                    advance(time, timer, 1000);
-                }
-                double s1 = avg60.get();
-                assertTrue(avg30.get() > 0);
-                for (int i = 0; i < 30; i++) {
-                    advance(time, timer, 1000);
-                }
-                assertTrue(avg60.get() > 0);
-                assertTrue(avg60.get() < s1);
-                assertEquals(0.0, avg30.get(), 0);
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
index 47e2fae..71bf68d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
@@ -22,11 +22,11 @@ import java.net.URI;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.LocalDLMEmulator;
 import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.tools.DistributedLogTool.*;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java
deleted file mode 100644
index f9e4eb8..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.fail;
-
-/**
- * Test Case for {@link FutureUtils}
- */
-public class TestFutureUtils {
-
-    static class TestException extends IOException {
-    }
-
-    @Test(timeout = 60000)
-    public void testWithin() throws Exception {
-        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
-                .corePoolSize(1)
-                .name("test-within")
-                .build();
-        final Promise<Void> promiseToTimeout = new Promise<Void>();
-        final Promise<Void> finalPromise = new Promise<Void>();
-        FutureUtils.within(
-                promiseToTimeout,
-                10,
-                TimeUnit.MILLISECONDS,
-                new TestException(),
-                scheduler,
-                "test-within"
-        ).addEventListener(new FutureEventListener<Void>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(finalPromise, cause);
-            }
-
-            @Override
-            public void onSuccess(Void value) {
-                FutureUtils.setValue(finalPromise, value);
-            }
-        });
-        try {
-            FutureUtils.result(finalPromise);
-            fail("Should fail with TestException");
-        } catch (TestException te) {
-            // expected
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
index 802649d..807ce02 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog.util;
 
+import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.zk.LimitedPermitManager;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java
deleted file mode 100644
index 7bfe5ed..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.FuturePool;
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Future;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-import scala.runtime.BoxedUnit;
-
-public class TestSafeQueueingFuturePool {
-    static final Logger LOG = LoggerFactory.getLogger(TestSafeQueueingFuturePool.class);
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    class TestFuturePool<T> {
-        final ScheduledExecutorService executor;
-        final FuturePool pool;
-        final SafeQueueingFuturePool<T> wrapper;
-        TestFuturePool() {
-            executor = Executors.newScheduledThreadPool(1);
-            pool = new ExecutorServiceFuturePool(executor);
-            wrapper = new SafeQueueingFuturePool<T>(pool);
-        }
-        public void shutdown() {
-            executor.shutdown();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testSimpleSuccess() throws Exception {
-        TestFuturePool<Void> pool = new TestFuturePool<Void>();
-        final AtomicBoolean result = new AtomicBoolean(false);
-        Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
-            public Void apply() {
-                result.set(true);
-                return null;
-            }
-        });
-        Await.result(future);
-        assertTrue(result.get());
-        pool.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testSimpleFailure() throws Exception {
-        TestFuturePool<Void> pool = new TestFuturePool<Void>();
-        Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
-            public Void apply() {
-                throw new RuntimeException("failed");
-            }
-        });
-        try {
-            Await.result(future);
-            fail("should have thrown");
-        } catch (Exception ex) {
-        }
-        pool.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testFailedDueToClosed() throws Exception {
-        TestFuturePool<Void> pool = new TestFuturePool<Void>();
-        pool.wrapper.close();
-        Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
-            public Void apply() {
-                throw new RuntimeException("failed");
-            }
-        });
-        try {
-            Await.result(future);
-            fail("should have thrown");
-        } catch (RejectedExecutionException ex) {
-        }
-        pool.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testRejectedFailure() throws Exception {
-        TestFuturePool<Void> pool = new TestFuturePool<Void>();
-        final AtomicBoolean result = new AtomicBoolean(false);
-        pool.executor.shutdown();
-        final CountDownLatch latch = new CountDownLatch(1);
-        Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
-            public Void apply() {
-                result.set(true);
-                latch.countDown();
-                return null;
-            }
-        });
-        try {
-            Await.result(future);
-            fail("should have thrown");
-        } catch (RejectedExecutionException ex) {
-        }
-        assertFalse(result.get());
-        pool.wrapper.close();
-        latch.await();
-        assertTrue(result.get());
-        pool.shutdown();
-    }
-
-    @Test(timeout = 60000)
-    public void testRejectedBackupFailure() throws Exception {
-        TestFuturePool<Void> pool = new TestFuturePool<Void>();
-        final AtomicBoolean result = new AtomicBoolean(false);
-        pool.executor.shutdownNow();
-        final CountDownLatch latch1 = new CountDownLatch(1);
-        final CountDownLatch latch2 = new CountDownLatch(1);
-        Future<Void> future1 = pool.wrapper.apply(new Function0<Void>() {
-            public Void apply() {
-                try {
-                    latch1.await();
-                } catch (Exception ex) {
-                }
-                return null;
-            }
-        });
-
-        // Enqueue a set of futures behind.
-        final int blockedCount = 100;
-        final ArrayList<Future<Void>> blockedFutures = new ArrayList<Future<Void>>(blockedCount);
-        final int[] doneArray = new int[blockedCount];
-        final AtomicInteger doneCount = new AtomicInteger(0);
-        for (int i = 0; i < blockedCount; i++) {
-            final int index = i;
-            blockedFutures.add(pool.wrapper.apply(new Function0<Void>() {
-                public Void apply() {
-                    doneArray[index] = doneCount.getAndIncrement();
-                    return null;
-                }
-            }));
-        }
-
-        // All the futures fail when the executor is force closed.
-        latch1.countDown();
-        pool.executor.shutdownNow();
-        for (int i = 0; i < blockedCount; i++) {
-            try {
-                Await.result(blockedFutures.get(i));
-                fail("should have thrown");
-            } catch (RejectedExecutionException ex) {
-            }
-        }
-
-        // None of them have completed.
-        for (int i = 0; i < blockedCount; i++) {
-            assertEquals(0, doneArray[i]);
-        }
-
-        // Close cleans up all pending ops in order.
-        pool.wrapper.close();
-        for (int i = 0; i < blockedCount; i++) {
-            assertEquals(i, doneArray[i]);
-        }
-
-        pool.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
index a9db6e0..acd441c 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
@@ -108,15 +108,15 @@ public class TestUtils extends ZooKeeperClusterTestCase {
     @Test(timeout = 60000)
     public void testZkGetData() throws Exception {
         String path1 = "/zk-get-data/non-existent-path";
-        Versioned<byte[]> data = FutureUtils.result(Utils.zkGetData(zkc.get(), path1, false));
+        Versioned<byte[]> data = Utils.ioResult(Utils.zkGetData(zkc.get(), path1, false));
         assertNull("No data should return from non-existent-path", data.getValue());
         assertNull("No version should return from non-existent-path", data.getVersion());
 
         String path2 = "/zk-get-data/path2";
         byte[] rawData = "test-data".getBytes(UTF_8);
-        FutureUtils.result(Utils.zkAsyncCreateFullPathOptimistic(zkc, path2, rawData,
+        Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkc, path2, rawData,
                 zkc.getDefaultACL(), CreateMode.PERSISTENT));
-        data = FutureUtils.result(Utils.zkGetData(zkc.get(), path2, false));
+        data = Utils.ioResult(Utils.zkGetData(zkc.get(), path2, false));
         assertArrayEquals("Data should return as written",
                 rawData, data.getValue());
         assertEquals("Version should be zero",

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/pom.xml b/distributedlog-protocol/pom.xml
index a483444..7197feb 100644
--- a/distributedlog-protocol/pom.xml
+++ b/distributedlog-protocol/pom.xml
@@ -26,35 +26,15 @@
   <name>Apache DistributedLog :: Protocol</name>
   <dependencies>
     <dependency>
-      <groupId>org.apache.bookkeeper.stats</groupId>
-      <artifactId>bookkeeper-stats-api</artifactId>
-      <version>${bookkeeper.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-common</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>${commons-lang.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <version>${commons-codec.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>finagle-core_2.11</artifactId>
-      <version>${finagle.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j.version}</version>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>net.jpountz.lz4</groupId>
@@ -67,6 +47,18 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency> 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index 9d2d7a7..2a60ff3 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -26,12 +26,6 @@ import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
 import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
 import static org.apache.distributedlog.LogRecordSet.VERSION;
 
-import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.exceptions.WriteException;
-import org.apache.distributedlog.io.Buffer;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.distributedlog.io.CompressionUtils;
-import com.twitter.util.Promise;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -39,6 +33,12 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +52,7 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
     private final Buffer buffer;
     private final DataOutputStream writer;
     private final WritableByteChannel writeChannel;
-    private final List<Promise<DLSN>> promiseList;
+    private final List<CompletableFuture<DLSN>> promiseList;
     private final CompressionCodec.Type codec;
     private final int codecCode;
     private int count = 0;
@@ -61,7 +61,7 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
     EnvelopedRecordSetWriter(int initialBufferSize,
                              CompressionCodec.Type codec) {
         this.buffer = new Buffer(Math.max(initialBufferSize, HEADER_LEN));
-        this.promiseList = new LinkedList<Promise<DLSN>>();
+        this.promiseList = new LinkedList<CompletableFuture<DLSN>>();
         this.codec = codec;
         switch (codec) {
             case LZ4:
@@ -84,13 +84,13 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
         this.writeChannel = Channels.newChannel(writer);
     }
 
-    synchronized List<Promise<DLSN>> getPromiseList() {
+    synchronized List<CompletableFuture<DLSN>> getPromiseList() {
         return promiseList;
     }
 
     @Override
     public synchronized void writeRecord(ByteBuffer record,
-                                         Promise<DLSN> transmitPromise)
+                                         CompletableFuture<DLSN> transmitPromise)
             throws LogRecordTooLongException, WriteException {
         int logRecordSize = record.remaining();
         if (logRecordSize > MAX_LOGRECORD_SIZE) {
@@ -111,16 +111,16 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
 
     private synchronized void satisfyPromises(long lssn, long entryId, long startSlotId) {
         long nextSlotId = startSlotId;
-        for (Promise<DLSN> promise : promiseList) {
-            promise.setValue(new DLSN(lssn, entryId, nextSlotId));
+        for (CompletableFuture<DLSN> promise : promiseList) {
+            promise.complete(new DLSN(lssn, entryId, nextSlotId));
             nextSlotId++;
         }
         promiseList.clear();
     }
 
     private synchronized void cancelPromises(Throwable reason) {
-        for (Promise<DLSN> promise : promiseList) {
-            promise.setException(reason);
+        for (CompletableFuture<DLSN> promise : promiseList) {
+            promise.completeExceptionally(reason);
         }
         promiseList.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
index 375ed3f..55b20ff 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
@@ -19,15 +19,15 @@ package org.apache.distributedlog;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.exceptions.WriteException;
-import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.CompressionCodec;
 
 /**
  * A set of {@link LogRecord}s.
@@ -134,7 +134,7 @@ public class LogRecordSet {
          * @throws LogRecordTooLongException if the record is too long
          * @throws WriteException when encountered exception writing the record
          */
-        void writeRecord(ByteBuffer record, Promise<DLSN> transmitPromise)
+        void writeRecord(ByteBuffer record, CompletableFuture<DLSN> transmitPromise)
                 throws LogRecordTooLongException, WriteException;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java
deleted file mode 100644
index f8cdea4..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.annotations;
-
-/**
- * Common annotation types.
- */
-public class DistributedLogAnnotations {
-    /**
-     * Annotation to identify flaky tests in DistributedLog.
-     * As and when we find that a test is flaky, we'll add this annotation to it for reference.
-     */
-    public @interface FlakyTest {}
-
-    /**
-     * Annotation to specify the occurrence of a compression operation. These are CPU intensive
-     * and should be avoided in low-latency paths.
-     */
-    public @interface Compression {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java
deleted file mode 100644
index 0922f14..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines annotations used across distributedlog project.
- */
-package org.apache.distributedlog.annotations;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java
deleted file mode 100644
index b5280c9..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Utils for bit mask operations.
- */
-public class BitMaskUtils {
-
-    /**
-     * 1) Unset all bits where value in mask is set.
-     * 2) Set these bits to value specified by newValue.
-     *
-     * <p>e.g.
-     * if oldValue = 1010, mask = 0011, newValue = 0001
-     * 1) 1010 -> 1000
-     * 2) 1000 -> 1001
-     *
-     * @param oldValue expected old value
-     * @param mask the mask of the value for updates
-     * @param newValue new value to set
-     * @return updated value
-     */
-    public static long set(long oldValue, long mask, long newValue) {
-        checkArgument(oldValue >= 0L && mask >= 0L && newValue >= 0L);
-        return ((oldValue & (~mask)) | (newValue & mask));
-    }
-
-    /**
-     * Get the bits where mask is 1.
-     *
-     * @param value value
-     * @param mask mask of the value
-     * @return the bit of the mask
-     */
-    public static long get(long value, long mask) {
-        checkArgument(value >= 0L && mask >= 0L);
-        return (value & mask);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java
deleted file mode 100644
index ee17950..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * defines the utilities used across the project.
- */
-package org.apache.distributedlog.util;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
index 95e03ab..1c5db24 100644
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
@@ -25,15 +25,14 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.LogRecordSet.Reader;
 import org.apache.distributedlog.LogRecordSet.Writer;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.io.CompressionCodec.Type;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import java.nio.ByteBuffer;
-import java.util.List;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.junit.Test;
 
 /**
@@ -72,7 +71,7 @@ public class TestLogRecordSet {
 
         ByteBuffer dataBuf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
         try {
-            writer.writeRecord(dataBuf, new Promise<DLSN>());
+            writer.writeRecord(dataBuf, new CompletableFuture<DLSN>());
             fail("Should fail on writing large record");
         } catch (LogRecordTooLongException lrtle) {
             // expected
@@ -111,18 +110,18 @@ public class TestLogRecordSet {
         assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
-        List<Future<DLSN>> writePromiseList = Lists.newArrayList();
+        List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
         /// write first 5 records
         for (int i = 0; i < 5; i++) {
             ByteBuffer record = ByteBuffer.wrap(("record-" + i).getBytes(UTF_8));
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<>();
             writer.writeRecord(record, writePromise);
             writePromiseList.add(writePromise);
             assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
         }
         ByteBuffer dataBuf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
         try {
-            writer.writeRecord(dataBuf, new Promise<DLSN>());
+            writer.writeRecord(dataBuf, new CompletableFuture<>());
             fail("Should fail on writing large record");
         } catch (LogRecordTooLongException lrtle) {
             // expected
@@ -132,7 +131,7 @@ public class TestLogRecordSet {
         /// write another 5 records
         for (int i = 0; i < 5; i++) {
             ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8));
-            Promise<DLSN> writePromise = new Promise<DLSN>();
+            CompletableFuture<DLSN> writePromise = new CompletableFuture<>();
             writer.writeRecord(record, writePromise);
             writePromiseList.add(writePromise);
             assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
@@ -143,7 +142,7 @@ public class TestLogRecordSet {
 
         // Test transmit complete
         writer.completeTransmit(1L, 1L, 10L);
-        List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
+        List<DLSN> writeResults = FutureUtils.result(FutureUtils.collect(writePromiseList));
         for (int i = 0; i < 10; i++) {
             assertEquals(new DLSN(1L, 1L, 10L + i), writeResults.get(i));
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java
deleted file mode 100644
index 20cf53c..0000000
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.notification.Failure;
-
-/**
- * Test Case for {@link TimedOutTestsListener}.
- */
-public class TestTimedOutTestsListener {
-
-    private static class Deadlock {
-        private CyclicBarrier barrier = new CyclicBarrier(6);
-
-        public Deadlock() {
-            DeadlockThread[] dThreads = new DeadlockThread[6];
-
-            Monitor a = new Monitor("a");
-            Monitor b = new Monitor("b");
-            Monitor c = new Monitor("c");
-            dThreads[0] = new DeadlockThread("MThread-1", a, b);
-            dThreads[1] = new DeadlockThread("MThread-2", b, c);
-            dThreads[2] = new DeadlockThread("MThread-3", c, a);
-
-            Lock d = new ReentrantLock();
-            Lock e = new ReentrantLock();
-            Lock f = new ReentrantLock();
-
-            dThreads[3] = new DeadlockThread("SThread-4", d, e);
-            dThreads[4] = new DeadlockThread("SThread-5", e, f);
-            dThreads[5] = new DeadlockThread("SThread-6", f, d);
-
-            // make them daemon threads so that the test will exit
-            for (int i = 0; i < 6; i++) {
-                dThreads[i].setDaemon(true);
-                dThreads[i].start();
-            }
-        }
-
-        class DeadlockThread extends Thread {
-            private Lock lock1 = null;
-
-            private Lock lock2 = null;
-
-            private Monitor mon1 = null;
-
-            private Monitor mon2 = null;
-
-            private boolean useSync;
-
-            DeadlockThread(String name, Lock lock1, Lock lock2) {
-                super(name);
-                this.lock1 = lock1;
-                this.lock2 = lock2;
-                this.useSync = true;
-            }
-
-            DeadlockThread(String name, Monitor mon1, Monitor mon2) {
-                super(name);
-                this.mon1 = mon1;
-                this.mon2 = mon2;
-                this.useSync = false;
-            }
-
-            public void run() {
-                if (useSync) {
-                    syncLock();
-                } else {
-                    monitorLock();
-                }
-            }
-
-            private void syncLock() {
-                lock1.lock();
-                try {
-                    try {
-                        barrier.await();
-                    } catch (Exception e) {
-                    }
-                    goSyncDeadlock();
-                } finally {
-                    lock1.unlock();
-                }
-            }
-
-            private void goSyncDeadlock() {
-                try {
-                    barrier.await();
-                } catch (Exception e) {
-                }
-                lock2.lock();
-                throw new RuntimeException("should not reach here.");
-            }
-
-            private void monitorLock() {
-                synchronized (mon1) {
-                    try {
-                        barrier.await();
-                    } catch (Exception e) {
-                    }
-                    goMonitorDeadlock();
-                }
-            }
-
-            private void goMonitorDeadlock() {
-                try {
-                    barrier.await();
-                } catch (Exception e) {
-                }
-                synchronized (mon2) {
-                    throw new RuntimeException(getName() + " should not reach here.");
-                }
-            }
-        }
-
-        class Monitor {
-            String name;
-
-            Monitor(String name) {
-                this.name = name;
-            }
-        }
-
-    }
-
-    @Test(timeout = 500)
-    public void testThreadDumpAndDeadlocks() throws Exception {
-        new Deadlock();
-        String s = null;
-        while (true) {
-            s = TimedOutTestsListener.buildDeadlockInfo();
-            if (s != null) {
-                break;
-            }
-            Thread.sleep(100);
-        }
-
-        Assert.assertEquals(3, countStringOccurrences(s, "BLOCKED"));
-
-        Failure failure = new Failure(null, new Exception(TimedOutTestsListener.TEST_TIMED_OUT_PREFIX));
-        StringWriter writer = new StringWriter();
-        new TimedOutTestsListener(new PrintWriter(writer)).testFailure(failure);
-        String out = writer.toString();
-
-        Assert.assertTrue(out.contains("THREAD DUMP"));
-        Assert.assertTrue(out.contains("DEADLOCKS DETECTED"));
-
-        System.out.println(out);
-    }
-
-    private int countStringOccurrences(String s, String substr) {
-        int n = 0;
-        int index = 0;
-        while ((index = s.indexOf(substr, index) + 1) != 0) {
-            n++;
-        }
-        return n;
-    }
-
-}