You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:48 UTC

[16/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index fdb29f3..1293d00 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -17,9 +17,25 @@
  */
 package org.apache.distributedlog;
 
+import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.DLIllegalStateException;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
@@ -43,34 +59,14 @@ import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import org.apache.distributedlog.util.Allocator;
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
-import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.common.util.PermitLimiter;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
 
 /**
  * Log Handler for Writers.
@@ -108,7 +104,7 @@ class BKLogWriteHandler extends BKLogHandler {
     protected final boolean validateLogSegmentSequenceNumber;
     protected final int regionId;
     protected final RollingPolicy rollingPolicy;
-    protected Future<? extends DistributedLock> lockFuture = null;
+    protected CompletableFuture<? extends DistributedLock> lockFuture = null;
     protected final PermitLimiter writeLimiter;
     protected final FeatureProvider featureProvider;
     protected final DynamicDistributedLogConfiguration dynConf;
@@ -117,16 +113,16 @@ class BKLogWriteHandler extends BKLogHandler {
     protected final LinkedList<Long> inprogressLSSNs;
 
     // Fetch LogSegments State: write can continue without full list of log segments while truncation needs
-    private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
-    private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
+    private final CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
+    private CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
 
     // Recover Functions
     private final RecoverLogSegmentFunction recoverLogSegmentFunction =
             new RecoverLogSegmentFunction();
-    private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction =
-            new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() {
+    private final Function<List<LogSegmentMetadata>, CompletableFuture<Long>> recoverLogSegmentsFunction =
+            new Function<List<LogSegmentMetadata>, CompletableFuture<Long>>() {
                 @Override
-                public Future<Long> apply(List<LogSegmentMetadata> segmentList) {
+                public CompletableFuture<Long> apply(List<LogSegmentMetadata> segmentList) {
                     LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList);
                     // if lastLedgerRollingTimeMillis is not updated, we set it to now.
                     synchronized (BKLogWriteHandler.this) {
@@ -145,8 +141,11 @@ class BKLogWriteHandler extends BKLogHandler {
                         }
                     }
 
-                    return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map(
-                            GetLastTxIdFunction.INSTANCE);
+                    return FutureUtils.processList(
+                        segmentList,
+                        recoverLogSegmentFunction,
+                        scheduler
+                    ).thenApply(GetLastTxIdFunction.INSTANCE);
                 }
             };
 
@@ -232,30 +231,30 @@ class BKLogWriteHandler extends BKLogHandler {
         deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
     }
 
-    private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
+    private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
             final Comparator<LogSegmentMetadata> comparator) {
-        final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
-        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+        final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
+        fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                promise.completeExceptionally(cause);
             }
 
             @Override
             public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
                 try {
-                    FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+                    promise.complete(getCachedLogSegments(comparator));
                 } catch (UnexpectedException e) {
-                    FutureUtils.setException(promise, e);
+                    promise.completeExceptionally(e);
                 }
             }
         });
         return promise;
     }
 
-    private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
+    private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
             final Comparator<LogSegmentMetadata> comparator) {
-        Future<Versioned<List<LogSegmentMetadata>>> result;
+        CompletableFuture<Versioned<List<LogSegmentMetadata>>> result;
         synchronized (this) {
             if (null == fetchForTruncation) {
                 fetchForTruncation = readLogSegmentsFromStore(
@@ -266,19 +265,19 @@ class BKLogWriteHandler extends BKLogHandler {
             result = fetchForTruncation;
         }
 
-        final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
-        result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+        final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
+        result.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                FutureUtils.completeExceptionally(promise, cause);
             }
 
             @Override
             public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
                 try {
-                    FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+                    FutureUtils.complete(promise, getCachedLogSegments(comparator));
                 } catch (UnexpectedException e) {
-                    FutureUtils.setException(promise, e);
+                    FutureUtils.completeExceptionally(promise, e);
                 }
             }
         });
@@ -374,7 +373,7 @@ class BKLogWriteHandler extends BKLogHandler {
      *
      * @return future represents the lock result
      */
-    Future<? extends DistributedLock> lockHandler() {
+    CompletableFuture<? extends DistributedLock> lockHandler() {
         if (null != lockFuture) {
             return lockFuture;
         }
@@ -382,11 +381,11 @@ class BKLogWriteHandler extends BKLogHandler {
         return lockFuture;
     }
 
-    Future<Void> unlockHandler() {
+    CompletableFuture<Void> unlockHandler() {
         if (null != lockFuture) {
             return lock.asyncClose();
         } else {
-            return Future.Void();
+            return FutureUtils.Void();
         }
     }
 
@@ -483,23 +482,23 @@ class BKLogWriteHandler extends BKLogHandler {
     }
 
     protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException {
-        return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
+        return Utils.ioResult(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
     }
 
-    protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
+    protected CompletableFuture<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
                                                               final boolean bestEffort,
                                                               final boolean allowMaxTxID) {
-        final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>();
+        final CompletableFuture<BKLogSegmentWriter> promise = new CompletableFuture<BKLogSegmentWriter>();
         try {
             lock.checkOwnershipAndReacquire();
         } catch (LockingException e) {
-            FutureUtils.setException(promise, e);
+            FutureUtils.completeExceptionally(promise, e);
             return promise;
         }
-        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+        fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                FutureUtils.completeExceptionally(promise, cause);
             }
 
             @Override
@@ -513,11 +512,11 @@ class BKLogWriteHandler extends BKLogHandler {
     protected void doStartLogSegment(final long txId,
                                      final boolean bestEffort,
                                      final boolean allowMaxTxID,
-                                     final Promise<BKLogSegmentWriter> promise) {
+                                     final CompletableFuture<BKLogSegmentWriter> promise) {
         // validate the tx id
         if ((txId < 0) ||
                 (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) {
-            FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
+            FutureUtils.completeExceptionally(promise, new IOException("Invalid Transaction Id " + txId));
             return;
         }
 
@@ -525,11 +524,11 @@ class BKLogWriteHandler extends BKLogHandler {
         if (txId < highestTxIdWritten) {
             if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
                 LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
-                FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
+                FutureUtils.completeExceptionally(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
                 return;
             } else {
                 LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
-                FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
+                FutureUtils.completeExceptionally(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
                 return;
             }
         }
@@ -554,7 +553,7 @@ class BKLogWriteHandler extends BKLogHandler {
         }
 
         logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
-                .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() {
+                .whenComplete(new FutureEventListener<LogSegmentEntryWriter>() {
 
             @Override
             public void onSuccess(LogSegmentEntryWriter entryWriter) {
@@ -574,13 +573,13 @@ class BKLogWriteHandler extends BKLogHandler {
         });
     }
 
-    private void failStartLogSegment(Promise<BKLogSegmentWriter> promise,
+    private void failStartLogSegment(CompletableFuture<BKLogSegmentWriter> promise,
                                      boolean bestEffort,
                                      Throwable cause) {
         if (bestEffort) {
-            FutureUtils.setValue(promise, null);
+            FutureUtils.complete(promise, null);
         } else {
-            FutureUtils.setException(promise, cause);
+            FutureUtils.completeExceptionally(promise, cause);
         }
     }
 
@@ -591,7 +590,7 @@ class BKLogWriteHandler extends BKLogHandler {
                                             final long txId,
                                             final LogSegmentEntryWriter entryWriter,
                                             boolean bestEffort,
-                                            final Promise<BKLogSegmentWriter> promise) {
+                                            final CompletableFuture<BKLogSegmentWriter> promise) {
         final long logSegmentSeqNo;
         try {
             FailpointUtils.checkFailPoint(
@@ -626,12 +625,12 @@ class BKLogWriteHandler extends BKLogHandler {
         LOG.debug("Try storing MaxTxId in startLogSegment  {} {}", inprogressZnodePath, txId);
         storeMaxTxId(txn, maxTxId, txId);
 
-        txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
+        txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
 
             @Override
             public void onSuccess(Void value) {
                 try {
-                    FutureUtils.setValue(promise, new BKLogSegmentWriter(
+                    FutureUtils.complete(promise, new BKLogSegmentWriter(
                             getFullyQualifiedName(),
                             l.getSegmentName(),
                             conf,
@@ -656,7 +655,7 @@ class BKLogWriteHandler extends BKLogHandler {
             public void onFailure(Throwable cause) {
                 failStartLogSegment(promise, false, cause);
             }
-        }, scheduler));
+        }, scheduler);
     }
 
     boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
@@ -672,21 +671,21 @@ class BKLogWriteHandler extends BKLogHandler {
      * the firstTxId of the ledger matches firstTxId for the segment we are
      * trying to finalize.
      */
-    Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
-        final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+    CompletableFuture<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
+        final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
         completeAndCloseLogSegment(writer, promise);
         return promise;
     }
 
     private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
-                                            final Promise<LogSegmentMetadata> promise) {
-        writer.asyncClose().addEventListener(new FutureEventListener<Void>() {
+                                            final CompletableFuture<LogSegmentMetadata> promise) {
+        writer.asyncClose().whenComplete(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 // in theory closeToFinalize should throw exception if a stream is in error.
                 // just in case, add another checking here to make sure we don't close log segment is a stream is in error.
                 if (writer.shouldFailCompleteLogSegment()) {
-                    FutureUtils.setException(promise,
+                    FutureUtils.completeExceptionally(promise,
                             new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error."));
                     return;
                 }
@@ -704,7 +703,7 @@ class BKLogWriteHandler extends BKLogHandler {
 
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                FutureUtils.completeExceptionally(promise, cause);
             }
         });
     }
@@ -791,7 +790,7 @@ class BKLogWriteHandler extends BKLogHandler {
             int recordCount,
             long lastEntryId,
             long lastSlotId) throws IOException {
-        Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+        CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
         doCompleteAndCloseLogSegment(
                 inprogressZnodeName,
                 logSegmentSeqNo,
@@ -802,7 +801,7 @@ class BKLogWriteHandler extends BKLogHandler {
                 lastEntryId,
                 lastSlotId,
                 promise);
-        return FutureUtils.result(promise);
+        return Utils.ioResult(promise);
     }
 
     protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
@@ -813,11 +812,11 @@ class BKLogWriteHandler extends BKLogHandler {
                                                 final int recordCount,
                                                 final long lastEntryId,
                                                 final long lastSlotId,
-                                                final Promise<LogSegmentMetadata> promise) {
-        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+                                                final CompletableFuture<LogSegmentMetadata> promise) {
+        fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                FutureUtils.completeExceptionally(promise, cause);
             }
 
             @Override
@@ -845,11 +844,11 @@ class BKLogWriteHandler extends BKLogHandler {
             int recordCount,
             long lastEntryId,
             long lastSlotId,
-            final Promise<LogSegmentMetadata> promise) {
+            final CompletableFuture<LogSegmentMetadata> promise) {
         try {
             lock.checkOwnershipAndReacquire();
         } catch (IOException ioe) {
-            FutureUtils.setException(promise, ioe);
+            FutureUtils.completeExceptionally(promise, ioe);
             return;
         }
 
@@ -858,7 +857,7 @@ class BKLogWriteHandler extends BKLogHandler {
 
         // validate log segment
         if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
-            FutureUtils.setException(promise, new IOException(
+            FutureUtils.completeExceptionally(promise, new IOException(
                 "Active ledger has different ID to inprogress. "
                     + inprogressLogSegment.getLogSegmentId() + " found, "
                     + logSegmentId + " expected"));
@@ -866,7 +865,7 @@ class BKLogWriteHandler extends BKLogHandler {
         }
         // validate the transaction id
         if (inprogressLogSegment.getFirstTxId() != firstTxId) {
-            FutureUtils.setException(promise, new IOException("Transaction id not as expected, "
+            FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, "
                 + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
             return;
         }
@@ -874,7 +873,7 @@ class BKLogWriteHandler extends BKLogHandler {
         if (validateLogSegmentSequenceNumber) {
             synchronized (inprogressLSSNs) {
                 if (inprogressLSSNs.isEmpty()) {
-                    FutureUtils.setException(promise, new UnexpectedException(
+                    FutureUtils.completeExceptionally(promise, new UnexpectedException(
                             "Didn't find matched inprogress log segments when completing inprogress "
                                     + inprogressLogSegment));
                     return;
@@ -886,7 +885,7 @@ class BKLogWriteHandler extends BKLogHandler {
                 // it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs}
                 if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) ||
                         (leastInprogressLSSN != logSegmentSeqNo)) {
-                    FutureUtils.setException(promise, new UnexpectedException(
+                    FutureUtils.completeExceptionally(promise, new UnexpectedException(
                             "Didn't find matched inprogress log segments when completing inprogress "
                                     + inprogressLogSegment));
                     return;
@@ -906,7 +905,7 @@ class BKLogWriteHandler extends BKLogHandler {
             LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
                     new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
             if (validateLogSegmentSequenceNumber) {
-                FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
+                FutureUtils.completeExceptionally(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
                         + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
                         + ", expected " + (logSegmentSeqNo - 1)));
                 return;
@@ -919,7 +918,7 @@ class BKLogWriteHandler extends BKLogHandler {
         try {
             startSequenceId = computeStartSequenceId(inprogressLogSegment);
         } catch (IOException ioe) {
-            FutureUtils.setException(promise, ioe);
+            FutureUtils.completeExceptionally(promise, ioe);
             return;
         }
         // write completed ledger znode
@@ -946,50 +945,45 @@ class BKLogWriteHandler extends BKLogHandler {
         LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId);
         storeMaxTxId(txn, maxTxId, lastTxId);
 
-        txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
+        txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 LOG.info("Completed {} to {} for {} : {}",
                         new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
                                 getFullyQualifiedName(), completedLogSegment });
-                FutureUtils.setValue(promise, completedLogSegment);
+                FutureUtils.complete(promise, completedLogSegment);
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                FutureUtils.completeExceptionally(promise, cause);
             }
-        }, scheduler));
+        }, scheduler);
     }
 
-    public Future<Long> recoverIncompleteLogSegments() {
+    public CompletableFuture<Long> recoverIncompleteLogSegments() {
         try {
             FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
         } catch (IOException ioe) {
-            return Future.exception(ioe);
+            return FutureUtils.exception(ioe);
         }
-        return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction);
+        return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).thenCompose(recoverLogSegmentsFunction);
     }
 
-    class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> {
+    class RecoverLogSegmentFunction implements Function<LogSegmentMetadata, CompletableFuture<LogSegmentMetadata>> {
 
         @Override
-        public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
+        public CompletableFuture<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
             if (!l.isInProgress()) {
-                return Future.value(l);
+                return FutureUtils.value(l);
             }
 
             LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName());
-            return asyncReadLastRecord(l, true, true, true).flatMap(
-                    new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() {
-                        @Override
-                        public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) {
-                            return completeLogSegment(l, lastRecord);
-                        }
-                    });
+            return asyncReadLastRecord(l, true, true, true).thenCompose(
+                lastRecord -> completeLogSegment(l, lastRecord));
         }
 
-        private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
+        private CompletableFuture<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
                                                               LogRecordWithDLSN lastRecord) {
             LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName());
 
@@ -1009,14 +1003,14 @@ class BKLogWriteHandler extends BKLogHandler {
                 LOG.error("Unrecoverable corruption has occurred in segment "
                     + l.toString() + " at path " + l.getZkPath()
                     + ". Unable to continue recovery.");
-                return Future.exception(new IOException("Unrecoverable corruption,"
+                return FutureUtils.exception(new IOException("Unrecoverable corruption,"
                     + " please check logs."));
             } else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
                 // TODO: Empty ledger - Ideally we should just remove it?
                 endTxId = l.getFirstTxId();
             }
 
-            Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+            CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
             doCompleteAndCloseLogSegment(
                     l.getZNodeName(),
                     l.getLogSegmentSequenceNumber(),
@@ -1032,21 +1026,16 @@ class BKLogWriteHandler extends BKLogHandler {
 
     }
 
-    Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
+    CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
         if (DLSN.InvalidDLSN == dlsn) {
             List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
-            return Future.value(emptyList);
+            return FutureUtils.value(emptyList);
         }
-        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
-                new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
-                    @Override
-                    public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
-                        return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn);
-                    }
-                });
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
+            logSegments -> setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn));
     }
 
-    private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
+    private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
                                                                                   final DLSN dlsn) {
         LOG.debug("Setting truncation status on logs older than {} from {} for {}",
                 new Object[]{dlsn, logSegments, getFullyQualifiedName()});
@@ -1064,7 +1053,7 @@ class BKLogWriteHandler extends BKLogHandler {
                     if (null != partialTruncate) {
                         String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l);
                         LOG.error(logMsg);
-                        return Future.exception(new DLIllegalStateException(logMsg));
+                        return FutureUtils.exception(new DLIllegalStateException(logMsg));
                     }
                     LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn});
                     partialTruncate = l;
@@ -1096,15 +1085,15 @@ class BKLogWriteHandler extends BKLogHandler {
         }
     }
 
-    Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
+    CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
         if (minTimestampToKeep >= Utils.nowInMillis()) {
-            return Future.exception(new IllegalArgumentException(
+            return FutureUtils.exception(new IllegalArgumentException(
                     "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
         }
-        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
-                new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
+                new Function<List<LogSegmentMetadata>, CompletableFuture<List<LogSegmentMetadata>>>() {
             @Override
-            public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
+            public CompletableFuture<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
                 List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());
 
                 int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments);
@@ -1129,38 +1118,35 @@ class BKLogWriteHandler extends BKLogHandler {
         });
     }
 
-    Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
-        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
-            new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
-                @Override
-                public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
-                    int numLogSegmentsToProcess;
+    CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
+            logSegments -> {
+                int numLogSegmentsToProcess;
 
-                    if (minTxIdToKeep < 0) {
-                        // we are deleting the log, we can remove whole log segments
-                        numLogSegmentsToProcess = logSegments.size();
+                if (minTxIdToKeep < 0) {
+                    // we are deleting the log, we can remove whole log segments
+                    numLogSegmentsToProcess = logSegments.size();
+                } else {
+                    numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
+                }
+                List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
+                for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
+                    LogSegmentMetadata l = logSegments.get(iterator);
+                    if ((minTxIdToKeep < 0) ||
+                        ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
+                        !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
+                        purgeList.add(l);
                     } else {
-                        numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
-                    }
-                    List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
-                    for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
-                        LogSegmentMetadata l = logSegments.get(iterator);
-                        if ((minTxIdToKeep < 0) ||
-                            ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
-                            !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
-                            purgeList.add(l);
-                        } else {
-                            // stop truncating log segments if we find either an inprogress or a partially
-                            // truncated log segment
-                            break;
-                        }
+                        // stop truncating log segments if we find either an inprogress or a partially
+                        // truncated log segment
+                        break;
                     }
-                    return deleteLogSegments(purgeList);
                 }
+                return deleteLogSegments(purgeList);
             });
     }
 
-    private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
+    private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
             final List<LogSegmentMetadata> truncateList,
             LogSegmentMetadata partialTruncate,
             DLSN minActiveDLSN) {
@@ -1183,39 +1169,31 @@ class BKLogWriteHandler extends BKLogHandler {
             listAfterTruncated.add(newSegment);
         }
 
-        return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() {
-            @Override
-            public List<LogSegmentMetadata> apply(Void value) {
-                for (int i = 0; i < listToTruncate.size(); i++) {
-                    removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
-                    LogSegmentMetadata newSegment = listAfterTruncated.get(i);
-                    addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
-                }
-                return listAfterTruncated;
+        return updateTxn.execute().thenApply(value -> {
+            for (int i = 0; i < listToTruncate.size(); i++) {
+                removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
+                LogSegmentMetadata newSegment = listAfterTruncated.get(i);
+                addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
             }
+            return listAfterTruncated;
         });
     }
 
-    private Future<List<LogSegmentMetadata>> deleteLogSegments(
+    private CompletableFuture<List<LogSegmentMetadata>> deleteLogSegments(
             final List<LogSegmentMetadata> logs) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
         }
         return FutureUtils.processList(logs,
-                new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() {
-            @Override
-            public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) {
-                return deleteLogSegment(segment);
-            }
-        }, scheduler);
+            segment -> deleteLogSegment(segment), scheduler);
     }
 
-    private Future<LogSegmentMetadata> deleteLogSegment(
+    private CompletableFuture<LogSegmentMetadata> deleteLogSegment(
             final LogSegmentMetadata ledgerMetadata) {
         LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName());
-        final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+        final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+        promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
             @Override
             public void onSuccess(LogSegmentMetadata segment) {
                 deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
@@ -1227,10 +1205,10 @@ class BKLogWriteHandler extends BKLogHandler {
             }
         });
         entryStore.deleteLogSegment(ledgerMetadata)
-                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+                .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
+                FutureUtils.completeExceptionally(promise, cause);
             }
 
             @Override
@@ -1242,14 +1220,14 @@ class BKLogWriteHandler extends BKLogHandler {
     }
 
     private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
-                                          final Promise<LogSegmentMetadata> promise) {
+                                          final CompletableFuture<LogSegmentMetadata> promise) {
         Transaction<Object> deleteTxn = metadataStore.transaction();
         metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
             @Override
             public void onCommit(Void r) {
                 // purge log segment
                 removeLogSegmentFromCache(segmentMetadata.getZNodeName());
-                promise.setValue(segmentMetadata);
+                promise.complete(segmentMetadata);
             }
 
             @Override
@@ -1257,12 +1235,12 @@ class BKLogWriteHandler extends BKLogHandler {
                 if (t instanceof LogSegmentNotFoundException) {
                     // purge log segment
                     removeLogSegmentFromCache(segmentMetadata.getZNodeName());
-                    promise.setValue(segmentMetadata);
+                    promise.complete(segmentMetadata);
                     return;
                 } else {
                     LOG.error("Couldn't purge {} for {}: with error {}",
                             new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
-                    promise.setException(t);
+                    promise.completeExceptionally(t);
                 }
             }
         });
@@ -1270,14 +1248,14 @@ class BKLogWriteHandler extends BKLogHandler {
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         return Utils.closeSequence(scheduler,
                 lock,
                 logSegmentAllocator);
     }
 
     @Override
-    public Future<Void> asyncAbort() {
+    public CompletableFuture<Void> asyncAbort() {
         return asyncClose();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
index bf89823..47301b5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
@@ -20,17 +20,15 @@ package org.apache.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
 import org.apache.distributedlog.exceptions.IdleReaderException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -48,7 +46,7 @@ class BKSyncLogReader implements LogReader, AsyncNotification {
     private final AtomicReference<IOException> readerException =
             new AtomicReference<IOException>(null);
     private final int maxReadAheadWaitTime;
-    private Promise<Void> closeFuture;
+    private CompletableFuture<Void> closeFuture;
     private final Optional<Long> startTransactionId;
     private boolean positioned = false;
     private Entry.Reader currentEntry = null;
@@ -101,13 +99,10 @@ class BKSyncLogReader implements LogReader, AsyncNotification {
                     bkdlm.alertStatsLogger);
         readHandler.registerListener(readAheadReader);
         readHandler.asyncStartFetchLogSegments()
-                .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                        readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
-                        readAheadReader.start(logSegments.getValue());
-                        return BoxedUnit.UNIT;
-                    }
+                .thenApply(logSegments -> {
+                    readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
+                    readAheadReader.start(logSegments.getValue());
+                    return null;
                 });
     }
 
@@ -234,26 +229,28 @@ class BKSyncLogReader implements LogReader, AsyncNotification {
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closePromise;
+    public CompletableFuture<Void> asyncClose() {
+        CompletableFuture<Void> closePromise;
         synchronized (this) {
             if (null != closeFuture) {
                 return closeFuture;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
         }
         readHandler.unregisterListener(readAheadReader);
         readAheadReader.removeStateChangeNotification(this);
-        Utils.closeSequence(bkdlm.getScheduler(), true,
-                readAheadReader,
-                readHandler
-        ).proxyTo(closePromise);
+        FutureUtils.proxyTo(
+            Utils.closeSequence(bkdlm.getScheduler(), true,
+                    readAheadReader,
+                    readHandler
+            ),
+            closePromise);
         return closePromise;
     }
 
     @Override
     public void close() throws IOException {
-        FutureUtils.result(asyncClose());
+        Utils.ioResult(asyncClose());
     }
 
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
index 7d33d12..15296b2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
@@ -17,11 +17,11 @@
  */
 package org.apache.distributedlog;
 
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.util.FutureUtils;
-
 import java.io.IOException;
 import java.util.List;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.util.Utils;
 
 class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
 
@@ -59,7 +59,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
      */
     @Override
     public void markEndOfStream() throws IOException {
-        FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream());
+        Utils.ioResult(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream());
         closeAndComplete();
     }
 
@@ -73,7 +73,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
         long highestTransactionId = 0;
         BKLogSegmentWriter writer = getCachedLogWriter();
         if (null != writer) {
-            highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.flush()));
+            highestTransactionId = Math.max(highestTransactionId, Utils.ioResult(writer.flush()));
         }
         return highestTransactionId;
     }
@@ -93,7 +93,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
         long highestTransactionId = 0;
         BKLogSegmentWriter writer = getCachedLogWriter();
         if (null != writer) {
-            highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.commit()));
+            highestTransactionId = Math.max(highestTransactionId, Utils.ioResult(writer.commit()));
             LOG.debug("FlushAndSync Completed");
         } else {
             LOG.debug("FlushAndSync Completed - Nothing to Flush");

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
index 6ed662b..3715327 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
@@ -17,30 +17,28 @@
  */
 package org.apache.distributedlog;
 
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 
 class BKTransmitPacket {
 
     private final EntryBuffer recordSet;
     private final long transmitTime;
-    private final Promise<Integer> transmitComplete;
+    private final CompletableFuture<Integer> transmitComplete;
 
     BKTransmitPacket(EntryBuffer recordSet) {
         this.recordSet = recordSet;
         this.transmitTime = System.nanoTime();
-        this.transmitComplete = new Promise<Integer>();
+        this.transmitComplete = new CompletableFuture<Integer>();
     }
 
     EntryBuffer getRecordSet() {
         return recordSet;
     }
 
-    Promise<Integer> getTransmitFuture() {
+    CompletableFuture<Integer> getTransmitFuture() {
         return transmitComplete;
     }
 
@@ -53,7 +51,7 @@ class BKTransmitPacket {
      *          transmit result code.
      */
     public void notifyTransmitComplete(int transmitResult) {
-        transmitComplete.setValue(transmitResult);
+        transmitComplete.complete(transmitResult);
     }
 
     /**
@@ -66,7 +64,7 @@ class BKTransmitPacket {
      * @see #awaitTransmitComplete(long, TimeUnit)
      */
     void addTransmitCompleteListener(FutureEventListener<Integer> transmitCompleteListener) {
-        transmitComplete.addEventListener(transmitCompleteListener);
+        transmitComplete.whenComplete(transmitCompleteListener);
     }
 
     /**
@@ -79,8 +77,7 @@ class BKTransmitPacket {
      */
     int awaitTransmitComplete(long timeout, TimeUnit unit)
         throws Exception {
-        return Await.result(transmitComplete,
-                Duration.fromTimeUnit(timeout, unit));
+        return FutureUtils.result(transmitComplete, timeout, unit);
     }
 
     public long getTransmitTime() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index a7b17f4..2ea3b5d 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -18,17 +18,7 @@
 package org.apache.distributedlog;
 
 import com.google.common.base.Optional;
-import org.apache.distributedlog.ZooKeeperClient.Credentials;
-import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
-import org.apache.distributedlog.exceptions.AlreadyClosedException;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.net.NetUtils;
-import org.apache.distributedlog.util.ConfUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -41,6 +31,14 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.ZooKeeperClient.Credentials;
+import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.net.NetUtils;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.zookeeper.KeeperException;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
@@ -198,52 +196,52 @@ public class BookKeeperClient {
     }
 
     // Util functions
-    public Future<LedgerHandle> createLedger(int ensembleSize,
-                                             int writeQuorumSize,
-                                             int ackQuorumSize) {
+    public CompletableFuture<LedgerHandle> createLedger(int ensembleSize,
+                                                        int writeQuorumSize,
+                                                        int ackQuorumSize) {
         BookKeeper bk;
         try {
             bk = get();
         } catch (IOException ioe) {
-            return Future.exception(ioe);
+            return FutureUtils.exception(ioe);
         }
-        final Promise<LedgerHandle> promise = new Promise<LedgerHandle>();
+        final CompletableFuture<LedgerHandle> promise = new CompletableFuture<LedgerHandle>();
         bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
                 BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() {
                     @Override
                     public void createComplete(int rc, LedgerHandle lh, Object ctx) {
                         if (BKException.Code.OK == rc) {
-                            promise.updateIfEmpty(new Return<LedgerHandle>(lh));
+                            promise.complete(lh);
                         } else {
-                            promise.updateIfEmpty(new Throw<LedgerHandle>(BKException.create(rc)));
+                            promise.completeExceptionally(BKException.create(rc));
                         }
                     }
                 }, null);
         return promise;
     }
 
-    public Future<Void> deleteLedger(long lid,
+    public CompletableFuture<Void> deleteLedger(long lid,
                                      final boolean ignoreNonExistentLedger) {
         BookKeeper bk;
         try {
             bk = get();
         } catch (IOException ioe) {
-            return Future.exception(ioe);
+            return FutureUtils.exception(ioe);
         }
-        final Promise<Void> promise = new Promise<Void>();
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
         bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() {
             @Override
             public void deleteComplete(int rc, Object ctx) {
                 if (BKException.Code.OK == rc) {
-                    promise.updateIfEmpty(new Return<Void>(null));
+                    promise.complete(null);
                 } else if (BKException.Code.NoSuchLedgerExistsException == rc) {
                     if (ignoreNonExistentLedger) {
-                        promise.updateIfEmpty(new Return<Void>(null));
+                        promise.complete(null);
                     } else {
-                        promise.updateIfEmpty(new Throw<Void>(BKException.create(rc)));
+                        promise.completeExceptionally(BKException.create(rc));
                     }
                 } else {
-                    promise.updateIfEmpty(new Throw<Void>(BKException.create(rc)));
+                    promise.completeExceptionally(BKException.create(rc));
                 }
             }
         }, null);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 0cb608f..3269f57 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import org.apache.distributedlog.bk.QuorumConfig;
 import org.apache.distributedlog.feature.DefaultFeatureProvider;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.net.DNSResolverForRacks;
 import org.apache.distributedlog.net.DNSResolverForRows;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -1351,7 +1351,6 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      * <p>
      * The setting is only applied for v2 implementation.
      *
-     * @see org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor
      * @return number of resource release threads used by distributedlog namespace.
      */
     public int getNumResourceReleaseThreads() {
@@ -3048,7 +3047,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     /**
      * Whether to enable per stream stat or not.
      *
-     * @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)}
+     * @deprecated please use {@link NamespaceBuilder#perLogStatsLogger(StatsLogger)}
      * @return flag to enable per stream stat.
      */
     public boolean getEnablePerStreamStat() {
@@ -3058,7 +3057,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     /**
      * Set the flag to enable per stream stat or not.
      *
-     * @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)}
+     * @deprecated please use {@link NamespaceBuilder#perLogStatsLogger(StatsLogger)}
      * @param enabled
      *          flag to enable/disable per stream stat.
      * @return dl configuration.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
deleted file mode 100644
index 7d33e9c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.callback.LogSegmentListener;
-import org.apache.distributedlog.io.AsyncCloseable;
-import org.apache.distributedlog.namespace.NamespaceDriver;
-import org.apache.distributedlog.subscription.SubscriptionStateStore;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A DistributedLogManager is responsible for managing a single place of storing
- * edit logs. It may correspond to multiple files, a backup node, etc.
- * Even when the actual underlying storage is rolled, or failed and restored,
- * each conceptual place of storage corresponds to exactly one instance of
- * this class, which is created when the EditLog is first opened.
- */
-public interface DistributedLogManager extends AsyncCloseable, Closeable {
-
-    /**
-     * Get the name of the stream managed by this log manager
-     * @return streamName
-     */
-    public String getStreamName();
-
-    /**
-     * Get the namespace driver used by this manager.
-     *
-     * @return the namespace driver
-     */
-    public NamespaceDriver getNamespaceDriver();
-
-    /**
-     * Get log segments.
-     *
-     * @return log segments
-     * @throws IOException
-     */
-    public List<LogSegmentMetadata> getLogSegments() throws IOException;
-
-    /**
-     * Register <i>listener</i> on log segment updates of this stream.
-     *
-     * @param listener
-     *          listener to receive update log segment list.
-     */
-    public void registerListener(LogSegmentListener listener) throws IOException ;
-
-    /**
-     * Unregister <i>listener</i> on log segment updates from this stream.
-     *
-     * @param listener
-     *          listener to receive update log segment list.
-     */
-    public void unregisterListener(LogSegmentListener listener);
-
-    /**
-     * Open async log writer to write records to the log stream.
-     *
-     * @return result represents the open result
-     */
-    public Future<AsyncLogWriter> openAsyncLogWriter();
-
-    /**
-     * Begin writing to the log stream identified by the name
-     *
-     * @return the writer interface to generate log records
-     */
-    public LogWriter startLogSegmentNonPartitioned() throws IOException;
-
-    /**
-     * Begin writing to the log stream identified by the name
-     *
-     * @return the writer interface to generate log records
-     */
-    // @Deprecated
-    public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
-
-    /**
-     * Begin appending to the end of the log stream which is being treated as a sequence of bytes
-     *
-     * @return the writer interface to generate log records
-     */
-    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
-
-    /**
-     * Get a reader to read a log stream as a sequence of bytes
-     *
-     * @return the writer interface to generate log records
-     */
-    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
-
-    /**
-     * Get the input stream starting with fromTxnId for the specified log
-     *
-     * @param fromTxnId - the first transaction id we want to read
-     * @return the stream starting with transaction fromTxnId
-     * @throws IOException if a stream cannot be found.
-     */
-    public LogReader getInputStream(long fromTxnId)
-        throws IOException;
-
-    public LogReader getInputStream(DLSN fromDLSN) throws IOException;
-
-    /**
-     * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
-     *
-     * @param fromTxnId
-     *          transaction id to start reading from
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
-
-    /**
-     * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
-     *
-     * @param fromDLSN
-     *          dlsn to start reading from
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
-
-    // @Deprecated
-    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
-
-    // @Deprecated
-    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
-
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
-
-    /**
-     * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
-     * If two readers tried to open using same subscriberId, one would succeed, while the other
-     * will be blocked until it gets the lock.
-     *
-     * @param fromDLSN
-     *          start dlsn
-     * @param subscriberId
-     *          subscriber id
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
-
-    /**
-     * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
-     * its last commit position recorded in subscription store. If no last commit position found
-     * in subscription store, it would start reading from head of the stream.
-     *
-     * If the two readers tried to open using same subscriberId, one would succeed, while the other
-     * will be blocked until it gets the lock.
-     *
-     * @param subscriberId
-     *          subscriber id
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
-
-    /**
-     * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
-     *
-     * @param transactionId
-     *          transaction id
-     * @return dlsn of first log record whose transaction id is not less than transactionId.
-     */
-    public Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
-
-    /**
-     * Get the last log record in the stream
-     *
-     * @return the last log record in the stream
-     * @throws IOException if a stream cannot be found.
-     */
-    public LogRecordWithDLSN getLastLogRecord()
-        throws IOException;
-
-    /**
-     * Get the earliest Transaction Id available in the log
-     *
-     * @return earliest transaction id
-     * @throws IOException
-     */
-    public long getFirstTxId() throws IOException;
-
-    /**
-     * Get Latest Transaction Id in the log
-     *
-     * @return latest transaction id
-     * @throws IOException
-     */
-    public long getLastTxId() throws IOException;
-
-    /**
-     * Get Latest DLSN in the log
-     *
-     * @return last dlsn
-     * @throws IOException
-     */
-    public DLSN getLastDLSN() throws IOException;
-
-    /**
-     * Get Latest log record with DLSN in the log - async
-     *
-     * @return latest log record with DLSN
-     */
-    public Future<LogRecordWithDLSN> getLastLogRecordAsync();
-
-    /**
-     * Get Latest Transaction Id in the log - async
-     *
-     * @return latest transaction id
-     */
-    public Future<Long> getLastTxIdAsync();
-
-    /**
-     * Get first DLSN in the log.
-     *
-     * @return first dlsn in the stream
-     */
-    public Future<DLSN> getFirstDLSNAsync();
-
-    /**
-     * Get Latest DLSN in the log - async
-     *
-     * @return latest transaction id
-     */
-    public Future<DLSN> getLastDLSNAsync();
-
-    /**
-     * Get the number of log records in the active portion of the log
-     * Any log segments that have already been truncated will not be included
-     *
-     * @return number of log records
-     * @throws IOException
-     */
-    public long getLogRecordCount() throws IOException;
-
-    /**
-     * Get the number of log records in the active portion of the log - async.
-     * Any log segments that have already been truncated will not be included
-     *
-     * @return future number of log records
-     * @throws IOException
-     */
-    public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
-
-    /**
-     * Run recovery on the log.
-     *
-     * @throws IOException
-     */
-    public void recover() throws IOException;
-
-    /**
-     * Check if an end of stream marker was added to the stream
-     * A stream with an end of stream marker cannot be appended to
-     *
-     * @return true if the marker was added to the stream, false otherwise
-     * @throws IOException
-     */
-    public boolean isEndOfStreamMarked() throws IOException;
-
-    /**
-     * Delete the log.
-     *
-     * @throws IOException if the deletion fails
-     */
-    public void delete() throws IOException;
-
-    /**
-     * The DistributedLogManager may archive/purge any logs for transactionId
-     * less than or equal to minImageTxId.
-     * This is to be used only when the client explicitly manages deletion. If
-     * the cleanup policy is based on sliding time window, then this method need
-     * not be called.
-     *
-     * @param minTxIdToKeep the earliest txid that must be retained
-     * @throws IOException if purging fails
-     */
-    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
-    /**
-     * Get the subscriptions store provided by the distributedlog manager.
-     *
-     * @return subscriptions store manages subscriptions for current stream.
-     */
-    public SubscriptionsStore getSubscriptionsStore();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
index 617282c..30cd499 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
@@ -19,10 +19,10 @@ package org.apache.distributedlog;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 
@@ -342,7 +342,7 @@ public class Entry {
          * @throws LogRecordTooLongException if the record is too long
          * @throws WriteException when encountered exception writing the record
          */
-        void writeRecord(LogRecord record, Promise<DLSN> transmitPromise)
+        void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
                 throws LogRecordTooLongException, WriteException;
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index aed47fc..09301aa 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -30,10 +30,10 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 
-import org.apache.distributedlog.annotations.DistributedLogAnnotations.Compression;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.Compression;
 import org.apache.distributedlog.io.CompressionCodec;
 import org.apache.distributedlog.io.CompressionUtils;
-import org.apache.distributedlog.util.BitMaskUtils;
+import org.apache.distributedlog.common.util.BitMaskUtils;
 
 /**
  * An enveloped entry written to BookKeeper.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 54858d7..18645d4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
@@ -24,7 +25,6 @@ import org.apache.distributedlog.exceptions.WriteCancelledException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.io.Buffer;
 import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,9 +46,9 @@ class EnvelopedEntryWriter implements Writer {
     private static class WriteRequest {
 
         private final int numRecords;
-        private final Promise<DLSN> promise;
+        private final CompletableFuture<DLSN> promise;
 
-        WriteRequest(int numRecords, Promise<DLSN> promise) {
+        WriteRequest(int numRecords, CompletableFuture<DLSN> promise) {
             this.numRecords = numRecords;
             this.promise = promise;
         }
@@ -89,7 +89,7 @@ class EnvelopedEntryWriter implements Writer {
 
     @Override
     public synchronized void writeRecord(LogRecord record,
-                                         Promise<DLSN> transmitPromise)
+                                         CompletableFuture<DLSN> transmitPromise)
             throws LogRecordTooLongException, WriteException {
         int logRecordSize = record.getPersistentSize();
         if (logRecordSize > MAX_LOGRECORD_SIZE) {
@@ -121,7 +121,7 @@ class EnvelopedEntryWriter implements Writer {
     private synchronized void satisfyPromises(long lssn, long entryId) {
         long nextSlotId = 0;
         for (WriteRequest request : writeRequests) {
-            request.promise.setValue(new DLSN(lssn, entryId, nextSlotId));
+            request.promise.complete(new DLSN(lssn, entryId, nextSlotId));
             nextSlotId += request.numRecords;
         }
         writeRequests.clear();
@@ -129,7 +129,7 @@ class EnvelopedEntryWriter implements Writer {
 
     private synchronized void cancelPromises(Throwable reason) {
         for (WriteRequest request : writeRequests) {
-            request.promise.setException(reason);
+            request.promise.completeExceptionally(reason);
         }
         writeRequests.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
deleted file mode 100644
index baf3182..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
- *
- * <h3>Lifecycle of a Reader</h3>
- *
- * A reader is a <i>sequential</i> reader that read records from a DL log starting
- * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
- * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
- * <p>
- * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
- * to read records out the log from provided position.
- * <p>
- * Closing the reader (via {@link #close()} will release all the resources occupied
- * by this reader instance.
- * <p>
- * Exceptions could be thrown during reading records. Once the exception is thrown,
- * the reader is set to an error state and it isn't usable anymore. It is the application's
- * responsibility to handle the exceptions and re-create readers if necessary.
- * <p>
- * Example:
- * <pre>
- * DistributedLogManager dlm = ...;
- * long nextTxId = ...;
- * LogReader reader = dlm.getInputStream(nextTxId);
- *
- * while (true) { // keep reading & processing records
- *     LogRecord record;
- *     try {
- *         record = reader.readNext(false);
- *         nextTxId = record.getTransactionId();
- *         // process the record
- *         ...
- *     } catch (IOException ioe) {
- *         // handle the exception
- *         ...
- *         reader = dlm.getInputStream(nextTxId + 1);
- *     }
- * }
- *
- * </pre>
- *
- * <h3>Read Records</h3>
- *
- * Reading records from an <i>endless</i> log in `synchronous` way isn't as
- * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
- * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
- * controlling the <i>waiting</i> behavior on `synchronous` reads.
- *
- * <h4>Blocking vs NonBlocking</h4>
- *
- * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
- * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
- * means the reads will only check readahead cache and return whatever records
- * available in the readahead cache.
- * <p>
- * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
- * catching up with writer (there are records in the log), the read call will
- * wait until records are read and returned. If the reader is caught up with
- * writer (there are no more records in the log at read time), the read call
- * will wait for a small period of time (defined in
- * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
- * records available in the readahead cache. In other words, if a reader sees
- * no record on blocking reads, it means the reader is `caught-up` with the
- * writer.
- * <p>
- * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
- * state machines. Applications could use <i>blocking</i> reads till caught up
- * with latest data. Once they are caught up with latest data, they could start
- * serving their service and turn to <i>non-blocking</i> read mode and tail read
- * data from the logs.
- * <p>
- * See examples below.
- *
- * <h4>Read Single Record</h4>
- *
- * {@link #readNext(boolean)} is reading individual records from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- *
- * // keep reading records in blocking way until no records available in the log
- * LogRecord record = reader.readNext(false);
- * while (null != record) {
- *     // process the record
- *     ...
- *     // read next record
- *     records = reader.readNext(false);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- *     record = reader.readNext(true)
- *     // process the new records
- *     ...
- * }
- * </pre>
- *
- * <h4>Read Batch of Records</h4>
- *
- * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
- * from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- * int N = 10;
- *
- * // keep reading N records in blocking way until no records available in the log
- * List<LogRecord> records = reader.readBulk(false, N);
- * while (!records.isEmpty()) {
- *     // process the list of records
- *     ...
- *     if (records.size() < N) { // no more records available in the log
- *         break;
- *     }
- *     // read next N records
- *     records = reader.readBulk(false, N);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- *     records = reader.readBulk(true, N)
- *     // process the new records
- *     ...
- * }
- *
- * </pre>
- *
- * <p>
- * NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
- * the {@link AsyncCloseable} interface so the reader could be closed asynchronously
- *
- * @see AsyncLogReader
- */
-public interface LogReader extends Closeable, AsyncCloseable {
-
-    /**
-     * Read the next log record from the stream.
-     * <p>
-     * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
-     * records from read ahead cache. It would return <i>null</i> if there isn't any records
-     * available in the read ahead cache.
-     * <p>
-     * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
-     * block until return a record if there are records in the stream (aka catching up).
-     * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
-     * milliseconds and return null if there isn't any more records in the stream.
-     *
-     * @param nonBlocking should the read make blocking calls to the backend or rely on the
-     * readAhead cache
-     * @return an operation from the stream or null if at end of stream
-     * @throws IOException if there is an error reading from the stream
-     */
-    public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
-
-    /**
-     * Read the next <i>numLogRecords</i> log records from the stream
-     *
-     * @param nonBlocking should the read make blocking calls to the backend or rely on the
-     * readAhead cache
-     * @param numLogRecords maximum number of log records returned by this call.
-     * @return an operation from the stream or empty list if at end of stream
-     * @throws IOException if there is an error reading from the stream
-     * @see #readNext(boolean)
-     */
-    public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
index c5050ec..462ddaa 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
@@ -23,14 +23,13 @@ import java.util.Comparator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
 import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException;
 import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -590,21 +589,21 @@ public class LogSegmentMetadata {
                 .build();
     }
 
-    public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) {
+    public static CompletableFuture<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) {
         return read(zkc, path, false);
     }
 
-    public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) {
-        final Promise<LogSegmentMetadata> result = new Promise<LogSegmentMetadata>();
+    public static CompletableFuture<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) {
+        final CompletableFuture<LogSegmentMetadata> result = new CompletableFuture<LogSegmentMetadata>();
         try {
             zkc.get().getData(path, false, new AsyncCallback.DataCallback() {
                 @Override
                 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                     if (KeeperException.Code.OK.intValue() != rc) {
                         if (KeeperException.Code.NONODE.intValue() == rc) {
-                            FutureUtils.setException(result, new LogSegmentNotFoundException(path));
+                            FutureUtils.completeExceptionally(result, new LogSegmentNotFoundException(path));
                         } else {
-                            FutureUtils.setException(result,
+                            FutureUtils.completeExceptionally(result,
                                     new ZKException("Failed to read log segment metadata from " + path,
                                             KeeperException.Code.get(rc)));
                         }
@@ -612,17 +611,17 @@ public class LogSegmentMetadata {
                     }
                     try {
                         LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck);
-                        FutureUtils.setValue(result, metadata);
+                        FutureUtils.complete(result, metadata);
                     } catch (IOException ie) {
                         LOG.error("Error on parsing log segment metadata from {} : ", path, ie);
-                        result.setException(ie);
+                        result.completeExceptionally(ie);
                     }
                 }
             }, null);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            result.setException(FutureUtils.zkException(e, path));
+            result.completeExceptionally(Utils.zkException(e, path));
         } catch (InterruptedException e) {
-            result.setException(FutureUtils.zkException(e, path));
+            result.completeExceptionally(Utils.zkException(e, path));
         }
         return result;
     }