You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:45 UTC

[13/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java
deleted file mode 100644
index 79f9c32..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
-
-public class VoidFunctions {
-
-    public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC =
-            new AbstractFunction1<List<Void>, Void>() {
-                @Override
-                public Void apply(List<Void> list) {
-                    return null;
-                }
-            };
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index dbe5400..21fe227 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -24,7 +24,7 @@ import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
 import org.apache.distributedlog.acl.AccessControlManager;
@@ -47,7 +47,7 @@ import org.apache.distributedlog.metadata.LogMetadataStore;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.namespace.NamespaceDriverManager;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.bookkeeper.feature.FeatureProvider;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
index 0761cfc..7069cbb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
@@ -19,23 +19,22 @@ package org.apache.distributedlog.impl;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.metadata.LogMetadataStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
-import java.net.URI;
-import java.util.Iterator;
-import java.util.List;
-
 import static org.apache.distributedlog.util.DLUtils.*;
 
 /**
@@ -60,18 +59,18 @@ public class ZKLogMetadataStore implements LogMetadataStore {
     }
 
     @Override
-    public Future<URI> createLog(String logName) {
-        return Future.value(namespace);
+    public CompletableFuture<URI> createLog(String logName) {
+        return FutureUtils.value(namespace);
     }
 
     @Override
-    public Future<Optional<URI>> getLogLocation(String logName) {
-        return Future.value(nsOptional);
+    public CompletableFuture<Optional<URI>> getLogLocation(String logName) {
+        return FutureUtils.value(nsOptional);
     }
 
     @Override
-    public Future<Iterator<String>> getLogs() {
-        final Promise<Iterator<String>> promise = new Promise<Iterator<String>>();
+    public CompletableFuture<Iterator<String>> getLogs() {
+        final CompletableFuture<Iterator<String>> promise = new CompletableFuture<Iterator<String>>();
         final String nsRootPath = namespace.getPath();
         try {
             final ZooKeeper zk = zkc.get();
@@ -89,30 +88,30 @@ public class ZKLogMetadataStore implements LogMetadataStore {
                                             results.add(child);
                                         }
                                     }
-                                    promise.setValue(results.iterator());
+                                    promise.complete(results.iterator());
                                 } else if (KeeperException.Code.NONODE.intValue() == rc) {
                                     List<String> streams = Lists.newLinkedList();
-                                    promise.setValue(streams.iterator());
+                                    promise.complete(streams.iterator());
                                 } else {
-                                    promise.setException(new ZKException("Error reading namespace " + nsRootPath,
+                                    promise.completeExceptionally(new ZKException("Error reading namespace " + nsRootPath,
                                             KeeperException.Code.get(rc)));
                                 }
                             }
                         }, null);
                     } else if (KeeperException.Code.NONODE.intValue() == syncRc) {
                         List<String> streams = Lists.newLinkedList();
-                        promise.setValue(streams.iterator());
+                        promise.complete(streams.iterator());
                     } else {
-                        promise.setException(new ZKException("Error reading namespace " + nsRootPath,
+                        promise.completeExceptionally(new ZKException("Error reading namespace " + nsRootPath,
                                 KeeperException.Code.get(syncRc)));
                     }
                 }
             }, null);
             zkc.get();
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index b9cb374..f747045 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.impl;
 
 import com.google.common.collect.ImmutableList;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -29,17 +30,15 @@ import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Transaction.OpListener;
+import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.DefaultZKOp;
 import org.apache.distributedlog.zk.ZKOp;
 import org.apache.distributedlog.zk.ZKTransaction;
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -116,7 +115,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         @Override
         public void run() {
             if (null != store.listeners.get(logSegmentsPath)) {
-                store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this);
+                store.zkGetLogSegmentNames(logSegmentsPath, store).whenComplete(this);
             } else {
                 logger.debug("Log segments listener for {} has been removed.", logSegmentsPath);
             }
@@ -350,18 +349,18 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
     }
 
     @Override
-    public Future<LogSegmentMetadata> getLogSegment(String logSegmentPath) {
+    public CompletableFuture<LogSegmentMetadata> getLogSegment(String logSegmentPath) {
         return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck);
     }
 
-    Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
-        Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
+    CompletableFuture<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
+        CompletableFuture<Versioned<List<String>>> result = new CompletableFuture<Versioned<List<String>>>();
         try {
             zkc.get().getChildren(logSegmentsPath, watcher, this, result);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            result.setException(FutureUtils.zkException(e, logSegmentsPath));
+            result.completeExceptionally(Utils.zkException(e, logSegmentsPath));
         } catch (InterruptedException e) {
-            result.setException(FutureUtils.zkException(e, logSegmentsPath));
+            result.completeExceptionally(Utils.zkException(e, logSegmentsPath));
         }
         return result;
     }
@@ -369,21 +368,21 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
     @Override
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
+        CompletableFuture<Versioned<List<String>>> result = ((CompletableFuture<Versioned<List<String>>>) ctx);
         if (KeeperException.Code.OK.intValue() == rc) {
             /** cversion: the number of changes to the children of this znode **/
             ZkVersion zkVersion = new ZkVersion(stat.getCversion());
-            result.setValue(new Versioned(children, zkVersion));
+            result.complete(new Versioned(children, zkVersion));
         } else if (KeeperException.Code.NONODE.intValue() == rc) {
-            result.setException(new LogNotFoundException("Log " + path + " not found"));
+            result.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
         } else {
-            result.setException(new ZKException("Failed to get log segments from " + path,
+            result.completeExceptionally(new ZKException("Failed to get log segments from " + path,
                     KeeperException.Code.get(rc)));
         }
     }
 
     @Override
-    public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
+    public CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
                                                               LogSegmentNamesListener listener) {
         Watcher zkWatcher;
         if (null == listener) {
@@ -422,9 +421,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
                 closeLock.readLock().unlock();
             }
         }
-        Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
+        CompletableFuture<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
         if (null != listener) {
-            getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this));
+            getLogSegmentNamesResult.whenComplete(new ReadLogSegmentsTask(logSegmentsPath, this));
         }
         return zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
index 551cc44..b3fe456 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
@@ -21,17 +21,16 @@ import java.io.IOException;
 import java.net.URI;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
@@ -45,7 +44,7 @@ import static org.apache.distributedlog.impl.BKNamespaceDriver.getZKServersFromD
 public class ZKMetadataAccessor implements MetadataAccessor {
     static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class);
     protected final String name;
-    protected Promise<Void> closePromise;
+    protected CompletableFuture<Void> closePromise;
     protected final URI uri;
     // zookeeper clients
     // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
@@ -213,13 +212,13 @@ public class ZKMetadataAccessor implements MetadataAccessor {
      * @return future represents the close result.
      */
     @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closeFuture;
+    public CompletableFuture<Void> asyncClose() {
+        CompletableFuture<Void> closeFuture;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
         }
         // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
         //       the managers created by the namespace - whose zkc will be closed by namespace
@@ -233,13 +232,13 @@ public class ZKMetadataAccessor implements MetadataAccessor {
         } catch (Exception e) {
             LOG.warn("Exception while closing distributed log manager", e);
         }
-        FutureUtils.setValue(closeFuture, null);
+        FutureUtils.complete(closeFuture, null);
         return closeFuture;
     }
 
     @Override
     public void close() throws IOException {
-        FutureUtils.result(asyncClose());
+        Utils.ioResult(asyncClose());
     }
 
     public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
index 63a81bd..e4a175c 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
@@ -19,10 +19,9 @@ package org.apache.distributedlog.impl.acl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
@@ -100,8 +99,8 @@ public class ZKAccessControl {
         return accessControlEntry;
     }
 
-    public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+    public CompletableFuture<ZKAccessControl> create(ZooKeeperClient zkc) {
+        final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
         try {
             zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT,
                     new AsyncCallback.StringCallback() {
@@ -109,48 +108,48 @@ public class ZKAccessControl {
                         public void processResult(int rc, String path, Object ctx, String name) {
                             if (KeeperException.Code.OK.intValue() == rc) {
                                 ZKAccessControl.this.zkVersion = 0;
-                                promise.setValue(ZKAccessControl.this);
+                                promise.complete(ZKAccessControl.this);
                             } else {
-                                promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                                promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                             }
                         }
                     }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (IOException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }
 
-    public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+    public CompletableFuture<ZKAccessControl> update(ZooKeeperClient zkc) {
+        final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
         try {
             zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx, Stat stat) {
                     if (KeeperException.Code.OK.intValue() == rc) {
                         ZKAccessControl.this.zkVersion = stat.getVersion();
-                        promise.setValue(ZKAccessControl.this);
+                        promise.complete(ZKAccessControl.this);
                     } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                        promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (IOException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }
 
-    public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+    public static CompletableFuture<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
+        final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
 
         try {
             zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() {
@@ -159,25 +158,25 @@ public class ZKAccessControl {
                     if (KeeperException.Code.OK.intValue() == rc) {
                         try {
                             AccessControlEntry ace = deserialize(zkPath, data);
-                            promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion()));
+                            promise.complete(new ZKAccessControl(ace, zkPath, stat.getVersion()));
                         } catch (IOException ioe) {
-                            promise.setException(ioe);
+                            promise.completeExceptionally(ioe);
                         }
                     } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                        promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }
 
-    public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
-        final Promise<Void> promise = new Promise<Void>();
+    public static CompletableFuture<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
 
         try {
             zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
@@ -185,16 +184,16 @@ public class ZKAccessControl {
                 public void processResult(int rc, String path, Object ctx) {
                     if (KeeperException.Code.OK.intValue() == rc ||
                             KeeperException.Code.NONODE.intValue() == rc) {
-                        promise.setValue(null);
+                        promise.complete(null);
                     } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                        promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
index be8db64..3dbde6a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
@@ -18,16 +18,15 @@
 package org.apache.distributedlog.impl.acl;
 
 import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.acl.AccessControlManager;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -76,7 +75,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
         this.scheduledExecutorService = scheduledExecutorService;
         this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
         try {
-            Await.result(fetchDefaultAccessControlEntry());
+            FutureUtils.result(fetchDefaultAccessControlEntry());
         } catch (Throwable t) {
             if (t instanceof InterruptedException) {
                 throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t);
@@ -90,7 +89,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
         }
 
         try {
-            Await.result(fetchAccessControlEntries());
+            FutureUtils.result(fetchAccessControlEntries());
         } catch (Throwable t) {
             if (t instanceof InterruptedException) {
                 throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t);
@@ -140,19 +139,19 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
         closed = true;
     }
 
-    private Future<Void> fetchAccessControlEntries() {
-        final Promise<Void> promise = new Promise<Void>();
+    private CompletableFuture<Void> fetchAccessControlEntries() {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
         fetchAccessControlEntries(promise);
         return promise;
     }
 
-    private void fetchAccessControlEntries(final Promise<Void> promise) {
+    private void fetchAccessControlEntries(final CompletableFuture<Void> promise) {
         try {
             zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                     if (KeeperException.Code.OK.intValue() != rc) {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                        promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                         return;
                     }
                     Set<String> streamsReceived = new HashSet<String>();
@@ -166,7 +165,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
                         }
                     }
                     if (streamsReceived.isEmpty()) {
-                        promise.setValue(null);
+                        promise.complete(null);
                         return;
                     }
                     final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size());
@@ -174,7 +173,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
                     for (String s : streamsReceived) {
                         final String streamName = s;
                         ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null)
-                                .addEventListener(new FutureEventListener<ZKAccessControl>() {
+                                .whenComplete(new FutureEventListener<ZKAccessControl>() {
 
                                     @Override
                                     public void onSuccess(ZKAccessControl accessControl) {
@@ -193,7 +192,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
                                             streamEntries.remove(streamName);
                                         } else {
                                             if (1 == numFailures.incrementAndGet()) {
-                                                promise.setException(cause);
+                                                promise.completeExceptionally(cause);
                                             }
                                         }
                                         complete();
@@ -201,7 +200,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
 
                                     private void complete() {
                                         if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) {
-                                            promise.setValue(null);
+                                            promise.complete(null);
                                         }
                                     }
                                 });
@@ -209,28 +208,28 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
     }
 
-    private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+    private CompletableFuture<ZKAccessControl> fetchDefaultAccessControlEntry() {
+        final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
         fetchDefaultAccessControlEntry(promise);
         return promise;
     }
 
-    private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) {
+    private void fetchDefaultAccessControlEntry(final CompletableFuture<ZKAccessControl> promise) {
         ZKAccessControl.read(zkc, zkRootPath, this)
-            .addEventListener(new FutureEventListener<ZKAccessControl>() {
+            .whenComplete(new FutureEventListener<ZKAccessControl>() {
                 @Override
                 public void onSuccess(ZKAccessControl accessControl) {
                     logger.info("Default Access Control will be changed from {} to {}",
                                 ZKAccessControlManager.this.defaultAccessControl,
                                 accessControl);
                     ZKAccessControlManager.this.defaultAccessControl = accessControl;
-                    promise.setValue(accessControl);
+                    promise.complete(accessControl);
                 }
 
                 @Override
@@ -239,21 +238,21 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
                         logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath);
                         createDefaultAccessControlEntryIfNeeded(promise);
                     } else {
-                        promise.setException(cause);
+                        promise.completeExceptionally(cause);
                     }
                 }
             });
     }
 
-    private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) {
+    private void createDefaultAccessControlEntryIfNeeded(final CompletableFuture<ZKAccessControl> promise) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
             return;
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
             return;
         }
         ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(),
@@ -264,7 +263,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
                     logger.info("Created zk path {} for default ACL.", zkRootPath);
                     fetchDefaultAccessControlEntry(promise);
                 } else {
-                    promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 }
             }
         }, null);
@@ -277,7 +276,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
         scheduledExecutorService.schedule(new Runnable() {
             @Override
             public void run() {
-                fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
+                fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>() {
                     @Override
                     public void onSuccess(ZKAccessControl value) {
                         // no-op
@@ -305,7 +304,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
         scheduledExecutorService.schedule(new Runnable() {
             @Override
             public void run() {
-                fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
+                fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>() {
                     @Override
                     public void onSuccess(Void value) {
                         // no-op
@@ -328,10 +327,10 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
         scheduledExecutorService.schedule(new Runnable() {
             @Override
             public void run() {
-                fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
+                fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>() {
                     @Override
                     public void onSuccess(ZKAccessControl value) {
-                        fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
+                        fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>() {
                             @Override
                             public void onSuccess(Void value) {
                                 // no-op

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 5d7af9d..17515c3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.callback.NamespaceListener;
@@ -32,12 +33,10 @@ import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.ZKNamespaceWatcher;
 import org.apache.distributedlog.metadata.LogMetadataStore;
 import org.apache.distributedlog.namespace.NamespaceWatcher;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -49,8 +48,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.net.URI;
@@ -80,8 +77,9 @@ import static com.google.common.base.Charsets.UTF_8;
  * NOTE: current federated namespace isn't optimized for deletion/creation. so don't use it in the workloads
  *       that have lots of creations or deletions.
  */
-public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable,
-        FutureEventListener<Set<URI>> {
+public class FederatedZKLogMetadataStore
+        extends NamespaceWatcher
+        implements LogMetadataStore, Watcher, Runnable, FutureEventListener<Set<URI>> {
 
     static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class);
 
@@ -100,7 +98,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
      * @throws KeeperException
      */
     public static void createFederatedNamespace(URI namespace, ZooKeeperClient zkc)
-            throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
+            throws IOException, KeeperException {
         String zkSubNamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES;
         Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0],
                 zkc.getDefaultACL(), CreateMode.PERSISTENT);
@@ -112,7 +110,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
     class SubNamespace implements NamespaceListener {
         final URI uri;
         final ZKNamespaceWatcher watcher;
-        Promise<Set<String>> logsFuture = new Promise<Set<String>>();
+        CompletableFuture<Set<String>> logsFuture = new CompletableFuture<Set<String>>();
 
         SubNamespace(URI uri) {
             this.uri = uri;
@@ -124,7 +122,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
             this.watcher.watchNamespaceChanges();
         }
 
-        synchronized Future<Set<String>> getLogs() {
+        synchronized CompletableFuture<Set<String>> getLogs() {
             return logsFuture;
         }
 
@@ -134,16 +132,16 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
             Set<String> oldLogs = Sets.newHashSet();
 
             // update the sub namespace cache
-            Promise<Set<String>> newLogsPromise;
+            CompletableFuture<Set<String>> newLogsPromise;
             synchronized (this) {
-                if (logsFuture.isDefined()) { // the promise is already satisfied
+                if (logsFuture.isDone()) { // the promise is already satisfied
                     try {
                         oldLogs = FutureUtils.result(logsFuture);
-                    } catch (IOException e) {
+                    } catch (Exception e) {
                         logger.error("Unexpected exception when getting logs from a satisified future of {} : ",
                                 uri, e);
                     }
-                    logsFuture = new Promise<Set<String>>();
+                    logsFuture = new CompletableFuture<Set<String>>();
                 }
 
                 // update the reverse cache
@@ -163,7 +161,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                 }
                 newLogsPromise = logsFuture;
             }
-            newLogsPromise.setValue(newLogs);
+            newLogsPromise.complete(newLogs);
 
             // notify namespace changes
             notifyOnNamespaceChanges();
@@ -203,7 +201,16 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
         this.maxLogsPerSubnamespace = conf.getFederatedMaxLogsPerSubnamespace();
 
         // fetch the sub namespace
-        Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this));
+        Set<URI> uris;
+        try {
+            uris = FutureUtils.result(fetchSubNamespaces(this));
+        } catch (Exception e) {
+            if (e instanceof IOException) {
+                throw (IOException) e;
+            } else {
+                throw new IOException(e);
+            }
+        }
         for (URI uri : uris) {
             SubNamespace subNs = new SubNamespace(uri);
             if (null == subNamespaces.putIfAbsent(uri, subNs)) {
@@ -228,21 +235,21 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
         }
     }
 
-    private <T> Future<T> postStateCheck(Future<T> future) {
-        final Promise<T> postCheckedPromise = new Promise<T>();
-        future.addEventListener(new FutureEventListener<T>() {
+    private <T> CompletableFuture<T> postStateCheck(CompletableFuture<T> future) {
+        final CompletableFuture<T> postCheckedPromise = new CompletableFuture<T>();
+        future.whenComplete(new FutureEventListener<T>() {
             @Override
             public void onSuccess(T value) {
                 if (duplicatedLogFound.get()) {
-                    postCheckedPromise.setException(new UnexpectedException("Duplicate log found under " + namespace));
+                    postCheckedPromise.completeExceptionally(new UnexpectedException("Duplicate log found under " + namespace));
                 } else {
-                    postCheckedPromise.setValue(value);
+                    postCheckedPromise.complete(value);
                 }
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                postCheckedPromise.setException(cause);
+                postCheckedPromise.completeExceptionally(cause);
             }
         });
         return postCheckedPromise;
@@ -273,13 +280,13 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                 namespace.getFragment());
     }
 
-    Future<Set<URI>> getCachedSubNamespaces() {
+    CompletableFuture<Set<URI>> getCachedSubNamespaces() {
         Set<URI> nsSet = subNamespaces.keySet();
-        return Future.value(nsSet);
+        return FutureUtils.value(nsSet);
     }
 
-    Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
-        final Promise<Set<URI>> promise = new Promise<Set<URI>>();
+    CompletableFuture<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
+        final CompletableFuture<Set<URI>> promise = new CompletableFuture<Set<URI>>();
         try {
             zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() {
                 @Override
@@ -287,27 +294,27 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                     if (Code.OK.intValue() == rc) {
                         fetchSubNamespaces(watcher, promise);
                     } else {
-                        promise.setException(KeeperException.create(Code.get(rc)));
+                        promise.completeExceptionally(KeeperException.create(Code.get(rc)));
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
         return promise;
     }
 
     private void fetchSubNamespaces(Watcher watcher,
-                                    final Promise<Set<URI>> promise) {
+                                    final CompletableFuture<Set<URI>> promise) {
         try {
             zkc.get().getChildren(this.zkSubnamespacesPath, watcher,
                     new AsyncCallback.Children2Callback() {
                         @Override
                         public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                             if (Code.NONODE.intValue() == rc) {
-                                promise.setException(new UnexpectedException(
+                                promise.completeExceptionally(new UnexpectedException(
                                         "The subnamespaces don't exist for the federated namespace " + namespace));
                             } else if (Code.OK.intValue() == rc) {
                                 Set<URI> subnamespaces = Sets.newHashSet();
@@ -318,26 +325,26 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                                     }
                                 } catch (URISyntaxException use) {
                                     logger.error("Invalid sub namespace uri found : ", use);
-                                    promise.setException(new UnexpectedException(
+                                    promise.completeExceptionally(new UnexpectedException(
                                             "Invalid sub namespace uri found in " + namespace, use));
                                     return;
                                 }
                                 // update the sub namespaces set before update version
                                 setZkSubnamespacesVersion(stat.getVersion());
-                                promise.setValue(subnamespaces);
+                                promise.complete(subnamespaces);
                             }
                         }
                     }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
     }
 
     @Override
     public void run() {
-        fetchSubNamespaces(this).addEventListener(this);
+        fetchSubNamespaces(this).whenComplete(this);
     }
 
     @Override
@@ -370,7 +377,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
         }
         if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
             // fetch the namespace
-            fetchSubNamespaces(this).addEventListener(this);
+            fetchSubNamespaces(this).whenComplete(this);
         }
     }
 
@@ -378,27 +385,27 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
     // Log Related Methods
     //
 
-    private <A> Future<A> duplicatedLogException(String logName) {
-        return Future.exception(new UnexpectedException("Duplicated log " + logName
+    private <A> CompletableFuture<A> duplicatedLogException(String logName) {
+        return FutureUtils.exception(new UnexpectedException("Duplicated log " + logName
                 + " found in namespace " + namespace));
     }
 
     @Override
-    public Future<URI> createLog(final String logName) {
+    public CompletableFuture<URI> createLog(final String logName) {
         if (duplicatedLogFound.get()) {
             return duplicatedLogException(duplicatedLogName.get());
         }
-        Promise<URI> createPromise = new Promise<URI>();
+        CompletableFuture<URI> createPromise = new CompletableFuture<URI>();
         doCreateLog(logName, createPromise);
         return postStateCheck(createPromise);
     }
 
-    void doCreateLog(final String logName, final Promise<URI> createPromise) {
-        getLogLocation(logName).addEventListener(new FutureEventListener<Optional<URI>>() {
+    void doCreateLog(final String logName, final CompletableFuture<URI> createPromise) {
+        getLogLocation(logName).whenComplete(new FutureEventListener<Optional<URI>>() {
             @Override
             public void onSuccess(Optional<URI> uriOptional) {
                 if (uriOptional.isPresent()) {
-                    createPromise.setException(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get()));
+                    createPromise.completeExceptionally(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get()));
                 } else {
                     getCachedSubNamespacesAndCreateLog(logName, createPromise);
                 }
@@ -406,14 +413,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
 
             @Override
             public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
+                createPromise.completeExceptionally(cause);
             }
         });
     }
 
     private void getCachedSubNamespacesAndCreateLog(final String logName,
-                                                    final Promise<URI> createPromise) {
-        getCachedSubNamespaces().addEventListener(new FutureEventListener<Set<URI>>() {
+                                                    final CompletableFuture<URI> createPromise) {
+        getCachedSubNamespaces().whenComplete(new FutureEventListener<Set<URI>>() {
             @Override
             public void onSuccess(Set<URI> uris) {
                 findSubNamespaceToCreateLog(logName, uris, createPromise);
@@ -421,14 +428,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
 
             @Override
             public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
+                createPromise.completeExceptionally(cause);
             }
         });
     }
 
     private void fetchSubNamespacesAndCreateLog(final String logName,
-                                                final Promise<URI> createPromise) {
-        fetchSubNamespaces(null).addEventListener(new FutureEventListener<Set<URI>>() {
+                                                final CompletableFuture<URI> createPromise) {
+        fetchSubNamespaces(null).whenComplete(new FutureEventListener<Set<URI>>() {
             @Override
             public void onSuccess(Set<URI> uris) {
                 findSubNamespaceToCreateLog(logName, uris, createPromise);
@@ -436,26 +443,26 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
 
             @Override
             public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
+                createPromise.completeExceptionally(cause);
             }
         });
     }
 
     private void findSubNamespaceToCreateLog(final String logName,
                                              final Set<URI> uris,
-                                             final Promise<URI> createPromise) {
+                                             final CompletableFuture<URI> createPromise) {
         final List<URI> uriList = Lists.newArrayListWithExpectedSize(uris.size());
-        List<Future<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size());
+        List<CompletableFuture<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size());
         for (URI uri : uris) {
             SubNamespace subNs = subNamespaces.get(uri);
             if (null == subNs) {
-                createPromise.setException(new UnexpectedException("No sub namespace " + uri + " found"));
+                createPromise.completeExceptionally(new UnexpectedException("No sub namespace " + uri + " found"));
                 return;
             }
             futureList.add(subNs.getLogs());
             uriList.add(uri);
         }
-        Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() {
+        FutureUtils.collect(futureList).whenComplete(new FutureEventListener<List<Set<String>>>() {
             @Override
             public void onSuccess(List<Set<String>> resultList) {
                 for (int i = resultList.size() - 1; i >= 0; i--) {
@@ -467,7 +474,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                     }
                 }
                 // All sub namespaces are full
-                createSubNamespace().addEventListener(new FutureEventListener<URI>() {
+                createSubNamespace().whenComplete(new FutureEventListener<URI>() {
                     @Override
                     public void onSuccess(URI uri) {
                         // the new namespace will be propagated to the namespace cache by the namespace listener
@@ -479,14 +486,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
 
                     @Override
                     public void onFailure(Throwable cause) {
-                        createPromise.setException(cause);
+                        createPromise.completeExceptionally(cause);
                     }
                 });
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
+                createPromise.completeExceptionally(cause);
             }
         });
     }
@@ -499,8 +506,8 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
         return SUB_NAMESPACE_PREFIX + parts[parts.length - 1];
     }
 
-    Future<URI> createSubNamespace() {
-        final Promise<URI> promise = new Promise<URI>();
+    CompletableFuture<URI> createSubNamespace() {
+        final CompletableFuture<URI> promise = new CompletableFuture<URI>();
 
         final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX;
         try {
@@ -512,21 +519,21 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                                 try {
                                     URI newUri = getSubNamespaceURI(getNamespaceFromZkPath(name));
                                     logger.info("Created sub namespace {}", newUri);
-                                    promise.setValue(newUri);
+                                    promise.complete(newUri);
                                 } catch (UnexpectedException ue) {
-                                    promise.setException(ue);
+                                    promise.completeExceptionally(ue);
                                 } catch (URISyntaxException e) {
-                                    promise.setException(new UnexpectedException("Invalid namespace " + name + " is created."));
+                                    promise.completeExceptionally(new UnexpectedException("Invalid namespace " + name + " is created."));
                                 }
                             } else {
-                                promise.setException(KeeperException.create(Code.get(rc)));
+                                promise.completeExceptionally(KeeperException.create(Code.get(rc)));
                             }
                         }
                     }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            promise.setException(e);
+            promise.completeExceptionally(e);
         }
 
         return promise;
@@ -545,22 +552,22 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
      */
     private void createLogInNamespace(final URI uri,
                                       final String logName,
-                                      final Promise<URI> createPromise) {
+                                      final CompletableFuture<URI> createPromise) {
         // TODO: rewrite this after we bump to zk 3.5, where we will have asynchronous version of multi
         scheduler.submit(new Runnable() {
             @Override
             public void run() {
                 try {
                     createLogInNamespaceSync(uri, logName);
-                    createPromise.setValue(uri);
+                    createPromise.complete(uri);
                 } catch (InterruptedException e) {
-                    createPromise.setException(e);
+                    createPromise.completeExceptionally(e);
                 } catch (IOException e) {
-                    createPromise.setException(e);
+                    createPromise.completeExceptionally(e);
                 } catch (KeeperException.BadVersionException bve) {
                     fetchSubNamespacesAndCreateLog(logName, createPromise);
                 } catch (KeeperException e) {
-                    createPromise.setException(e);
+                    createPromise.completeExceptionally(e);
                 }
             }
         });
@@ -617,39 +624,35 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
     }
 
     @Override
-    public Future<Optional<URI>> getLogLocation(final String logName) {
+    public CompletableFuture<Optional<URI>> getLogLocation(final String logName) {
         if (duplicatedLogFound.get()) {
             return duplicatedLogException(duplicatedLogName.get());
         }
         URI location = log2Locations.get(logName);
         if (null != location) {
-            return postStateCheck(Future.value(Optional.of(location)));
+            return postStateCheck(FutureUtils.value(Optional.of(location)));
         }
         if (!forceCheckLogExistence) {
             Optional<URI> result = Optional.absent();
-            return Future.value(result);
+            return FutureUtils.value(result);
         }
-        return postStateCheck(fetchLogLocation(logName).onSuccess(
-                new AbstractFunction1<Optional<URI>, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(Optional<URI> uriOptional) {
-                        if (uriOptional.isPresent()) {
-                            log2Locations.putIfAbsent(logName, uriOptional.get());
-                        }
-                        return BoxedUnit.UNIT;
-                    }
-                }));
+        return postStateCheck(fetchLogLocation(logName).thenApply((uriOptional) -> {
+            if (uriOptional.isPresent()) {
+                log2Locations.putIfAbsent(logName, uriOptional.get());
+            }
+            return uriOptional;
+        }));
     }
 
-    private Future<Optional<URI>> fetchLogLocation(final String logName) {
-        final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
+    private CompletableFuture<Optional<URI>> fetchLogLocation(final String logName) {
+        final CompletableFuture<Optional<URI>> fetchPromise = new CompletableFuture<Optional<URI>>();
 
         Set<URI> uris = subNamespaces.keySet();
-        List<Future<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size());
+        List<CompletableFuture<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size());
         for (URI uri : uris) {
             fetchFutures.add(fetchLogLocation(uri, logName));
         }
-        Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() {
+        FutureUtils.collect(fetchFutures).whenComplete(new FutureEventListener<List<Optional<URI>>>() {
             @Override
             public void onSuccess(List<Optional<URI>> fetchResults) {
                 Optional<URI> result = Optional.absent();
@@ -660,7 +663,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                                     new Object[] { logName, result.get(), fetchResult.get() });
                             duplicatedLogName.compareAndSet(null, logName);
                             duplicatedLogFound.set(true);
-                            fetchPromise.setException(new UnexpectedException("Log " + logName
+                            fetchPromise.completeExceptionally(new UnexpectedException("Log " + logName
                                     + " is found in multiple sub namespaces : "
                                     + result.get() + " & " + fetchResult.get()));
                             return;
@@ -669,62 +672,57 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
                         result = fetchResult;
                     }
                 }
-                fetchPromise.setValue(result);
+                fetchPromise.complete(result);
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                fetchPromise.setException(cause);
+                fetchPromise.completeExceptionally(cause);
             }
         });
         return fetchPromise;
     }
 
-    private Future<Optional<URI>> fetchLogLocation(final URI uri, String logName) {
-        final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
+    private CompletableFuture<Optional<URI>> fetchLogLocation(final URI uri, String logName) {
+        final CompletableFuture<Optional<URI>> fetchPromise = new CompletableFuture<Optional<URI>>();
         final String logRootPath = uri.getPath() + "/" + logName;
         try {
             zkc.get().exists(logRootPath, false, new AsyncCallback.StatCallback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx, Stat stat) {
                     if (Code.OK.intValue() == rc) {
-                        fetchPromise.setValue(Optional.of(uri));
+                        fetchPromise.complete(Optional.of(uri));
                     } else if (Code.NONODE.intValue() == rc) {
-                        fetchPromise.setValue(Optional.<URI>absent());
+                        fetchPromise.complete(Optional.<URI>absent());
                     } else {
-                        fetchPromise.setException(KeeperException.create(Code.get(rc)));
+                        fetchPromise.completeExceptionally(KeeperException.create(Code.get(rc)));
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            fetchPromise.setException(e);
+            fetchPromise.completeExceptionally(e);
         } catch (InterruptedException e) {
-            fetchPromise.setException(e);
+            fetchPromise.completeExceptionally(e);
         }
         return fetchPromise;
     }
 
     @Override
-    public Future<Iterator<String>> getLogs() {
+    public CompletableFuture<Iterator<String>> getLogs() {
         if (duplicatedLogFound.get()) {
             return duplicatedLogException(duplicatedLogName.get());
         }
-        return postStateCheck(retrieveLogs().map(
-                new AbstractFunction1<List<Set<String>>, Iterator<String>>() {
-                    @Override
-                    public Iterator<String> apply(List<Set<String>> resultList) {
-                        return getIterator(resultList);
-                    }
-                }));
+        return postStateCheck(retrieveLogs().thenApply(
+            resultList -> getIterator(resultList)));
     }
 
-    private Future<List<Set<String>>> retrieveLogs() {
+    private CompletableFuture<List<Set<String>>> retrieveLogs() {
         Collection<SubNamespace> subNss = subNamespaces.values();
-        List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
+        List<CompletableFuture<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
         for (SubNamespace subNs : subNss) {
             logsList.add(subNs.getLogs());
         }
-        return Future.collect(logsList);
+        return FutureUtils.collect(logsList);
     }
 
     private Iterator<String> getIterator(List<Set<String>> resultList) {
@@ -747,13 +745,9 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
     }
 
     private void notifyOnNamespaceChanges() {
-        retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(List<Set<String>> resultList) {
-                for (NamespaceListener listener : listeners) {
-                    listener.onStreamsChanged(getIterator(resultList));
-                }
-                return BoxedUnit.UNIT;
+        retrieveLogs().thenAccept(resultList -> {
+            for (NamespaceListener listener : listeners) {
+                listener.onStreamsChanged(getIterator(resultList));
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
index 8f9913e..e45c755 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
@@ -17,25 +17,23 @@
  */
 package org.apache.distributedlog.impl.logsegment;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.distributedlog.bk.LedgerAllocator;
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.util.Allocator;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.client.LedgerHandle;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
 
 /**
  * Allocate log segments
  */
 class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> {
 
-    private static class NewLogSegmentEntryWriterFn extends AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> {
+    private static class NewLogSegmentEntryWriterFn implements Function<LedgerHandle, LogSegmentEntryWriter> {
 
-        static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE =
+        static final Function<LedgerHandle, LogSegmentEntryWriter> INSTANCE =
                 new NewLogSegmentEntryWriterFn();
 
         private NewLogSegmentEntryWriterFn() {}
@@ -58,8 +56,8 @@ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object>
     }
 
     @Override
-    public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn,
-                                                   final Transaction.OpListener<LogSegmentEntryWriter> listener) {
+    public CompletableFuture<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn,
+                                                              final Transaction.OpListener<LogSegmentEntryWriter> listener) {
         return allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() {
             @Override
             public void onCommit(LedgerHandle lh) {
@@ -70,16 +68,16 @@ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object>
             public void onAbort(Throwable t) {
                 listener.onAbort(t);
             }
-        }).map(NewLogSegmentEntryWriterFn.INSTANCE);
+        }).thenApply(NewLogSegmentEntryWriterFn.INSTANCE);
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         return allocator.asyncClose();
     }
 
     @Override
-    public Future<Void> delete() {
+    public CompletableFuture<Void> delete() {
         return allocator.delete();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 034b23e..0bb91ae 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog.impl.logsegment;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.LogSegmentMetadata;
@@ -29,10 +30,8 @@ import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
 import org.apache.distributedlog.exceptions.ReadCancelledException;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -87,7 +86,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             return done;
         }
 
-        void setValue(LedgerEntry entry) {
+        void complete(LedgerEntry entry) {
             synchronized (this) {
                 if (done) {
                     return;
@@ -98,7 +97,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             setDone(true);
         }
 
-        void setException(int rc) {
+        void completeExceptionally(int rc) {
             synchronized (this) {
                 if (done) {
                     return;
@@ -152,16 +151,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             while (entries.hasMoreElements()) {
                 // more entries are returned
                 if (null != entry) {
-                    setException(BKException.Code.UnexpectedConditionException);
+                    completeExceptionally(BKException.Code.UnexpectedConditionException);
                     return;
                 }
                 entry = entries.nextElement();
             }
             if (null == entry || entry.getEntryId() != entryId) {
-                setException(BKException.Code.UnexpectedConditionException);
+                completeExceptionally(BKException.Code.UnexpectedConditionException);
                 return;
             }
-            setValue(entry);
+            complete(entry);
         }
 
         @Override
@@ -186,7 +185,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                 return;
             }
             if (null != entry && this.entryId == entryId) {
-                setValue(entry);
+                complete(entry);
                 return;
             }
             // the long poll is timeout or interrupted; we will retry it again.
@@ -215,7 +214,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                         nextReadBackoffTime,
                         TimeUnit.MILLISECONDS);
             } else {
-                setException(rc);
+                completeExceptionally(rc);
             }
             return false;
         }
@@ -229,7 +228,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     private class PendingReadRequest {
         private final int numEntries;
         private final List<Entry.Reader> entries;
-        private final Promise<List<Entry.Reader>> promise;
+        private final CompletableFuture<List<Entry.Reader>> promise;
 
         PendingReadRequest(int numEntries) {
             this.numEntries = numEntries;
@@ -238,15 +237,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             } else {
                 this.entries = new ArrayList<Entry.Reader>();
             }
-            this.promise = new Promise<List<Entry.Reader>>();
+            this.promise = new CompletableFuture<List<Entry.Reader>>();
         }
 
-        Promise<List<Entry.Reader>> getPromise() {
+        CompletableFuture<List<Entry.Reader>> getPromise() {
             return promise;
         }
 
-        void setException(Throwable throwable) {
-            FutureUtils.setException(promise, throwable);
+        void completeExceptionally(Throwable throwable) {
+            FutureUtils.completeExceptionally(promise, throwable);
         }
 
         void addEntry(Entry.Reader entry) {
@@ -254,7 +253,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         }
 
         void complete() {
-            FutureUtils.setValue(promise, entries);
+            FutureUtils.complete(promise, entries);
             onEntriesConsumed(entries.size());
         }
 
@@ -277,7 +276,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     private final int numPrefetchEntries;
     private final int maxPrefetchEntries;
     // state
-    private Promise<Void> closePromise = null;
+    private CompletableFuture<Void> closePromise = null;
     private LogSegmentMetadata metadata;
     private LedgerHandle lh;
     private final List<LedgerHandle> openLedgerHandles;
@@ -457,7 +456,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         if (isBeyondLastAddConfirmed()) {
             // if the reader is already caught up, let's fail the reader immediately
             // as we need to pull the latest metadata of this log segment.
-            setException(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc),
+            completeExceptionally(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc),
                     true);
             return;
         }
@@ -488,7 +487,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
      * @param throwable exception indicating the error
      * @param isBackground is the reader set exception by background reads or foreground reads
      */
-    private void setException(Throwable throwable, boolean isBackground) {
+    private void completeExceptionally(Throwable throwable, boolean isBackground) {
         lastException.compareAndSet(null, throwable);
         if (isBackground) {
             notifyReaders();
@@ -510,7 +509,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             readQueue.clear();
         }
         for (PendingReadRequest request : requestsToCancel) {
-            request.setException(throwExc);
+            request.completeExceptionally(throwExc);
         }
     }
 
@@ -630,11 +629,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     }
 
     @Override
-    public Future<List<Entry.Reader>> readNext(int numEntries) {
+    public CompletableFuture<List<Entry.Reader>> readNext(int numEntries) {
         final PendingReadRequest readRequest = new PendingReadRequest(numEntries);
 
         if (checkClosedOrInError()) {
-            readRequest.setException(lastException.get());
+            readRequest.completeExceptionally(lastException.get());
         } else {
             boolean wasQueueEmpty;
             synchronized (readQueue) {
@@ -682,9 +681,9 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             // mark the reader in error and abort all pending reads since
             // we don't know the last consumed read
             if (null == lastException.get()) {
-                if (nextRequest.getPromise().isInterrupted().isDefined()) {
-                    setException(new DLInterruptedException("Interrupted on reading log segment "
-                            + getSegment() + " : " + nextRequest.getPromise().isInterrupted().get()), false);
+                if (nextRequest.getPromise().isCancelled()) {
+                    completeExceptionally(new DLInterruptedException("Interrupted on reading log segment "
+                            + getSegment() + " : " + nextRequest.getPromise().isCancelled()), false);
                 }
             }
 
@@ -707,11 +706,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                 } else {
                     DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
                             + getSegment());
-                    nextRequest.setException(ise);
+                    nextRequest.completeExceptionally(ise);
                     if (null != request) {
-                        request.setException(ise);
+                        request.completeExceptionally(ise);
                     }
-                    setException(ise, false);
+                    completeExceptionally(ise, false);
                 }
             } else {
                 if (0 == scheduleCountLocal) {
@@ -732,7 +731,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             }
             // reach end of log segment
             if (hitEndOfLogSegment) {
-                setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
+                completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
                 return;
             }
             if (null == entry) {
@@ -742,7 +741,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
             if (!entry.isDone()) {
                 // we already reached end of the log segment
                 if (isEndOfLogSegment(entry.getEntryId())) {
-                    setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
+                    completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
                 }
                 return;
             }
@@ -751,13 +750,13 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                 if (entry != removedEntry) {
                     DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
                             + getSegment());
-                    setException(ise, false);
+                    completeExceptionally(ise, false);
                     return;
                 }
                 try {
                     nextRequest.addEntry(processReadEntry(entry.getEntry()));
                 } catch (IOException e) {
-                    setException(e, false);
+                    completeExceptionally(e, false);
                     return;
                 }
             } else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) {
@@ -766,7 +765,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                 readAheadEntries.poll();
                 continue;
             } else {
-                setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
+                completeExceptionally(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
                         + " @ log segment " + getSegment(), entry.getRc()), false);
                 return;
             }
@@ -812,26 +811,29 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closeFuture;
+    public CompletableFuture<Void> asyncClose() {
+        final CompletableFuture<Void> closeFuture;
         ReadCancelledException exception;
         LedgerHandle[] lhsToClose;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
             lhsToClose = openLedgerHandles.toArray(new LedgerHandle[openLedgerHandles.size()]);
             // set the exception to cancel pending and subsequent reads
             exception = new ReadCancelledException(getSegment().getZNodeName(), "Reader was closed");
-            setException(exception, false);
+            completeExceptionally(exception, false);
         }
 
         // cancel all pending reads
         cancelAllPendingReads(exception);
 
         // close all the open ledger
-        BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture);
+        FutureUtils.proxyTo(
+            BKUtils.closeLedgers(lhsToClose),
+            closeFuture
+        );
         return closeFuture;
     }
 }