You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:41 UTC

[09/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java
deleted file mode 100644
index 8e4a8be..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.exceptions.BKTransmitException;
-import org.apache.distributedlog.exceptions.LockingException;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.stats.OpStatsListener;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureCancelledException;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utilities to process future
- */
-public class FutureUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class);
-
-    public static class OrderedFutureEventListener<R>
-            implements FutureEventListener<R> {
-
-        public static <R> OrderedFutureEventListener<R> of(
-                FutureEventListener<R> listener,
-                OrderedScheduler scheduler,
-                Object key) {
-            return new OrderedFutureEventListener<R>(scheduler, key, listener);
-        }
-
-        private final OrderedScheduler scheduler;
-        private final Object key;
-        private final FutureEventListener<R> listener;
-
-        private OrderedFutureEventListener(OrderedScheduler scheduler,
-                                           Object key,
-                                           FutureEventListener<R> listener) {
-            this.scheduler = scheduler;
-            this.key = key;
-            this.listener = listener;
-        }
-
-        @Override
-        public void onSuccess(final R value) {
-            scheduler.submit(key, new Runnable() {
-                @Override
-                public void run() {
-                    listener.onSuccess(value);
-                }
-            });
-        }
-
-        @Override
-        public void onFailure(final Throwable cause) {
-            scheduler.submit(key, new Runnable() {
-                @Override
-                public void run() {
-                    listener.onFailure(cause);
-                }
-            });
-        }
-    }
-
-    public static class FutureEventListenerRunnable<R>
-            implements FutureEventListener<R> {
-
-        public static <R> FutureEventListenerRunnable<R> of(
-                FutureEventListener<R> listener,
-                ExecutorService executorService) {
-            return new FutureEventListenerRunnable<R>(executorService, listener);
-        }
-
-        private final ExecutorService executorService;
-        private final FutureEventListener<R> listener;
-
-        private FutureEventListenerRunnable(ExecutorService executorService,
-                                            FutureEventListener<R> listener) {
-            this.executorService = executorService;
-            this.listener = listener;
-        }
-
-        @Override
-        public void onSuccess(final R value) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    listener.onSuccess(value);
-                }
-            });
-        }
-
-        @Override
-        public void onFailure(final Throwable cause) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    listener.onFailure(cause);
-                }
-            });
-        }
-    }
-
-    private static class ListFutureProcessor<T, R>
-            extends Function<Throwable, BoxedUnit>
-            implements FutureEventListener<R>, Runnable {
-
-        private volatile boolean interrupted = false;
-        private final Iterator<T> itemsIter;
-        private final Function<T, Future<R>> processFunc;
-        private final Promise<List<R>> promise;
-        private final List<R> results;
-        private final ExecutorService callbackExecutor;
-
-        ListFutureProcessor(List<T> items,
-                            Function<T, Future<R>> processFunc,
-                            ExecutorService callbackExecutor) {
-            this.itemsIter = items.iterator();
-            this.processFunc = processFunc;
-            this.promise = new Promise<List<R>>();
-            this.promise.setInterruptHandler(this);
-            this.results = new ArrayList<R>();
-            this.callbackExecutor = callbackExecutor;
-        }
-
-        @Override
-        public BoxedUnit apply(Throwable cause) {
-            interrupted = true;
-            return BoxedUnit.UNIT;
-        }
-
-        @Override
-        public void onSuccess(R value) {
-            results.add(value);
-            if (null == callbackExecutor) {
-                run();
-            } else {
-                callbackExecutor.submit(this);
-            }
-        }
-
-        @Override
-        public void onFailure(final Throwable cause) {
-            if (null == callbackExecutor) {
-                promise.setException(cause);
-            } else {
-                callbackExecutor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        promise.setException(cause);
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void run() {
-            if (interrupted) {
-                logger.debug("ListFutureProcessor is interrupted.");
-                return;
-            }
-            if (!itemsIter.hasNext()) {
-                promise.setValue(results);
-                return;
-            }
-            processFunc.apply(itemsIter.next()).addEventListener(this);
-        }
-    }
-
-    /**
-     * Process the list of items one by one using the process function <i>processFunc</i>.
-     * The process will be stopped immediately if it fails on processing any one.
-     *
-     * @param collection list of items
-     * @param processFunc process function
-     * @param callbackExecutor executor to process the item
-     * @return future presents the list of processed results
-     */
-    public static <T, R> Future<List<R>> processList(List<T> collection,
-                                                     Function<T, Future<R>> processFunc,
-                                                     @Nullable ExecutorService callbackExecutor) {
-        ListFutureProcessor<T, R> processor =
-                new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
-        if (null != callbackExecutor) {
-            callbackExecutor.submit(processor);
-        } else {
-            processor.run();
-        }
-        return processor.promise;
-    }
-
-    /**
-     * Add a event listener over <i>result</i> for collecting the operation stats.
-     *
-     * @param result result to listen on
-     * @param opStatsLogger stats logger to record operations stats
-     * @param stopwatch stop watch to time operation
-     * @param <T>
-     * @return result after registered the event listener
-     */
-    public static <T> Future<T> stats(Future<T> result,
-                                      OpStatsLogger opStatsLogger,
-                                      Stopwatch stopwatch) {
-        return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch));
-    }
-
-    /**
-     * Await for the result of the future and thrown bk related exceptions.
-     *
-     * @param result future to wait for
-     * @return the result of future
-     * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions
-     *         thrown from the future, the exceptions will be wrapped into
-     *         {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}.
-     */
-    public static <T> T bkResult(Future<T> result) throws BKException {
-        try {
-            return Await.result(result);
-        } catch (BKException bke) {
-            throw bke;
-        } catch (InterruptedException ie) {
-            throw BKException.create(BKException.Code.InterruptedException);
-        } catch (Exception e) {
-            logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e);
-            throw BKException.create(BKException.Code.UnexpectedConditionException);
-        }
-    }
-
-    /**
-     * Return the bk exception return code for a <i>throwable</i>.
-     *
-     * @param throwable the cause of the exception
-     * @return the bk exception return code. if the exception isn't bk exceptions,
-     *         it would return {@link BKException.Code#UnexpectedConditionException}.
-     */
-    public static int bkResultCode(Throwable throwable) {
-        if (throwable instanceof BKException) {
-            return ((BKException)throwable).getCode();
-        }
-        return BKException.Code.UnexpectedConditionException;
-    }
-
-    /**
-     * Wait for the result until it completes.
-     *
-     * @param result result to wait
-     * @return the result
-     * @throws IOException when encountered exceptions on the result
-     */
-    public static <T> T result(Future<T> result) throws IOException {
-        return result(result, Duration.Top());
-    }
-
-    /**
-     * Wait for the result for a given <i>duration</i>.
-     * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with
-     * corresponding {@link com.twitter.util.TimeoutException}.
-     *
-     * @param result result to wait
-     * @param duration duration to wait
-     * @return the result
-     * @throws IOException when encountered exceptions on the result or waiting for the result.
-     */
-    public static <T> T result(Future<T> result, Duration duration)
-            throws IOException {
-        try {
-            return Await.result(result, duration);
-        } catch (KeeperException ke) {
-            throw new ZKException("Encountered zookeeper exception on waiting result", ke);
-        } catch (BKException bke) {
-            throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode());
-        } catch (IOException ioe) {
-            throw ioe;
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Interrupted on waiting result", ie);
-        } catch (Exception e) {
-            throw new IOException("Encountered exception on waiting result", e);
-        }
-    }
-
-    /**
-     * Wait for the result of a lock operation.
-     *
-     * @param result result to wait
-     * @param lockPath path of the lock
-     * @return the result
-     * @throws LockingException when encountered exceptions on the result of lock operation
-     */
-    public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException {
-        try {
-            return Await.result(result);
-        } catch (LockingException le) {
-            throw le;
-        } catch (Exception e) {
-            throw new LockingException(lockPath, "Encountered exception on locking ", e);
-        }
-    }
-
-    /**
-     * Convert the <i>throwable</i> to zookeeper related exceptions.
-     *
-     * @param throwable cause
-     * @param path zookeeper path
-     * @return zookeeper related exceptions
-     */
-    public static Throwable zkException(Throwable throwable, String path) {
-        if (throwable instanceof KeeperException) {
-            return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable);
-        } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
-            return new ZKException("Encountered zookeeper connection loss on " + path,
-                    KeeperException.Code.CONNECTIONLOSS);
-        } else if (throwable instanceof InterruptedException) {
-            return new DLInterruptedException("Interrupted on operating " + path, throwable);
-        } else {
-            return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
-        }
-    }
-
-    /**
-     * Cancel the future. It would interrupt the future.
-     *
-     * @param future future to cancel
-     */
-    public static <T> void cancel(Future<T> future) {
-        future.raise(new FutureCancelledException());
-    }
-
-    /**
-     * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
-     * If the promise has been satisfied before raising, it won't change the state of the promise.
-     *
-     * @param promise promise to raise exception
-     * @param timeout timeout period
-     * @param unit timeout period unit
-     * @param cause cause to raise
-     * @param scheduler scheduler to execute raising exception
-     * @param key the submit key used by the scheduler
-     * @return the promise applied with the raise logic
-     */
-    public static <T> Promise<T> within(final Promise<T> promise,
-                                        final long timeout,
-                                        final TimeUnit unit,
-                                        final Throwable cause,
-                                        final OrderedScheduler scheduler,
-                                        final Object key) {
-        if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
-            return promise;
-        }
-        // schedule a timeout to raise timeout exception
-        final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
-            @Override
-            public void run() {
-                if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
-                    logger.info("Raise exception", cause);
-                }
-            }
-        }, timeout, unit);
-        // when the promise is satisfied, cancel the timeout task
-        promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Try<T> value) {
-                if (!task.cancel(true)) {
-                    logger.debug("Failed to cancel the timeout task");
-                }
-                return BoxedUnit.UNIT;
-            }
-        });
-        return promise;
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provide value in an ordered scheduler.
-     * <p>If the promise was already satisfied, nothing will be changed.
-     *
-     * @param promise promise to satisfy
-     * @param value value to satisfy
-     * @param scheduler scheduler to satisfy the promise with provided value
-     * @param key the submit key of the ordered scheduler
-     */
-    public static <T> void setValue(final Promise<T> promise,
-                                    final T value,
-                                    OrderedScheduler scheduler,
-                                    Object key) {
-        scheduler.submit(key, new Runnable() {
-            @Override
-            public void run() {
-                setValue(promise, value);
-            }
-        });
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provide value.
-     * <p>If the promise was already satisfied, nothing will be changed.
-     *
-     * @param promise promise to satisfy
-     * @param value value to satisfy
-     * @return true if successfully satisfy the future. false if the promise has been satisfied.
-     */
-    public static <T> boolean setValue(Promise<T> promise, T value) {
-        boolean success = promise.updateIfEmpty(new Return<T>(value));
-        if (!success) {
-            logger.info("Result set multiple times. Value = '{}', New = 'Return({})'",
-                    promise.poll(), value);
-        }
-        return success;
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler.
-     *
-     * @param promise promise to satisfy
-     * @param throwable cause to satisfy
-     * @param scheduler the scheduler to satisfy the promise
-     * @param key submit key of the ordered scheduler
-     */
-    public static <T> void setException(final Promise<T> promise,
-                                        final Throwable cause,
-                                        OrderedScheduler scheduler,
-                                        Object key) {
-        scheduler.submit(key, new Runnable() {
-            @Override
-            public void run() {
-                setException(promise, cause);
-            }
-        });
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provided <i>cause</i>.
-     *
-     * @param promise promise to satisfy
-     * @param cause cause to satisfy
-     * @return true if successfully satisfy the future. false if the promise has been satisfied.
-     */
-    public static <T> boolean setException(Promise<T> promise, Throwable cause) {
-        boolean success = promise.updateIfEmpty(new Throw<T>(cause));
-        if (!success) {
-            logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'",
-                    promise.poll(), cause);
-        }
-        return success;
-    }
-
-    /**
-     * Ignore exception from the <i>future</i>.
-     *
-     * @param future the original future
-     * @return a transformed future ignores exceptions
-     */
-    public static <T> Promise<Void> ignore(Future<T> future) {
-        return ignore(future, null);
-    }
-
-    /**
-     * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions
-     *
-     * @param future the original future
-     * @param errorMsg the error message to log on exceptions
-     * @return a transformed future ignores exceptions
-     */
-    public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) {
-        final Promise<Void> promise = new Promise<Void>();
-        future.addEventListener(new FutureEventListener<T>() {
-            @Override
-            public void onSuccess(T value) {
-                setValue(promise, null);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                if (null != errorMsg) {
-                    logger.error(errorMsg, cause);
-                }
-                setValue(promise, null);
-            }
-        });
-        return promise;
-    }
-
-    /**
-     * Create transmit exception from transmit result.
-     *
-     * @param transmitResult
-     *          transmit result (basically bk exception code)
-     * @return transmit exception
-     */
-    public static BKTransmitException transmitException(int transmitResult) {
-        return new BKTransmitException("Failed to write to bookkeeper; Error is ("
-            + transmitResult + ") "
-            + BKException.getMessage(transmitResult), transmitResult);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java
deleted file mode 100644
index 3372476..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-
-import com.twitter.util.FuturePool;
-import com.twitter.util.FuturePool$;
-import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import scala.runtime.BoxedUnit;
-import scala.Function0;
-
-/**
- * {@link FuturePool} with exposed stats. This class is exposing following stats for helping understanding
- * the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * Stats are only exposed when <code>traceTaskExecution</code> is true.
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on waiting
- * being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on executing.
- * <li>task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on submitting.
- * <li>tasks_pending: gauge. how many tasks are pending in this future pool.
- * </ul>
- */
-public class MonitoredFuturePool implements FuturePool {
-    static final Logger LOG = LoggerFactory.getLogger(MonitoredFuturePool.class);
-
-    private final FuturePool futurePool;
-
-    private final StatsLogger statsLogger;
-    private final OpStatsLogger taskPendingTime;
-    private final OpStatsLogger taskExecutionTime;
-    private final OpStatsLogger taskEnqueueTime;
-    private final Counter taskPendingCounter;
-
-    private final boolean traceTaskExecution;
-    private final long traceTaskExecutionWarnTimeUs;
-
-    class TimedFunction0<T> extends com.twitter.util.Function0<T> {
-        private final Function0<T> function0;
-        private Stopwatch pendingStopwatch = Stopwatch.createStarted();
-
-        TimedFunction0(Function0<T> function0) {
-            this.function0 = function0;
-            this.pendingStopwatch = Stopwatch.createStarted();
-        }
-
-        @Override
-        public T apply() {
-            taskPendingTime.registerSuccessfulEvent(pendingStopwatch.elapsed(TimeUnit.MICROSECONDS));
-            Stopwatch executionStopwatch = Stopwatch.createStarted();
-            T result = function0.apply();
-            taskExecutionTime.registerSuccessfulEvent(executionStopwatch.elapsed(TimeUnit.MICROSECONDS));
-            long elapsed = executionStopwatch.elapsed(TimeUnit.MICROSECONDS);
-            if (elapsed > traceTaskExecutionWarnTimeUs) {
-                LOG.info("{} took too long {} microseconds", function0.toString(), elapsed);
-            }
-            return result;
-        }
-    }
-
-    /**
-     * Create a future pool with stats exposed.
-     *
-     * @param futurePool underlying future pool to execute futures
-     * @param statsLogger stats logger to receive exposed stats
-     * @param traceTaskExecution flag to enable/disable exposing stats about task execution
-     * @param traceTaskExecutionWarnTimeUs flag to enable/disable logging slow tasks
-     *                                     whose execution time is above this value
-     */
-    public MonitoredFuturePool(FuturePool futurePool,
-                               StatsLogger statsLogger,
-                               boolean traceTaskExecution,
-                               long traceTaskExecutionWarnTimeUs) {
-        this.futurePool = futurePool;
-        this.traceTaskExecution = traceTaskExecution;
-        this.traceTaskExecutionWarnTimeUs = traceTaskExecutionWarnTimeUs;
-        this.statsLogger = statsLogger;
-        this.taskPendingTime = statsLogger.getOpStatsLogger("task_pending_time");
-        this.taskExecutionTime = statsLogger.getOpStatsLogger("task_execution_time");
-        this.taskEnqueueTime = statsLogger.getOpStatsLogger("task_enqueue_time");
-        this.taskPendingCounter = statsLogger.getCounter("tasks_pending");
-    }
-
-    @Override
-    public <T> Future<T> apply(Function0<T> function0) {
-        if (traceTaskExecution) {
-            taskPendingCounter.inc();
-            Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted();
-            Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0));
-            taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS));
-            futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    taskPendingCounter.dec();
-                    return null;
-                }
-            });
-            return futureResult;
-        } else {
-            return futurePool.apply(function0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
deleted file mode 100644
index 3121a19..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for
- * helping understanding the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * <ul>
- * <li>pending_tasks: gauge. how many tasks are pending in this executor.
- * <li>completed_tasks: gauge. how many tasks are completed in this executor.
- * <li>total_tasks: gauge. how many tasks are submitted to this executor.
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
- * executing.
- * </ul>
- */
-public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
-    static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class);
-
-    private class TimedRunnable implements Runnable {
-
-        final Runnable runnable;
-        final long enqueueNanos;
-
-        TimedRunnable(Runnable runnable) {
-            this.runnable = runnable;
-            this.enqueueNanos = MathUtils.nowInNano();
-        }
-
-        @Override
-        public void run() {
-            long startNanos = MathUtils.nowInNano();
-            long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
-            taskPendingStats.registerSuccessfulEvent(pendingMicros);
-            try {
-                runnable.run();
-            } finally {
-                long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
-                taskExecutionStats.registerSuccessfulEvent(executionMicros);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return runnable.toString();
-        }
-
-        @Override
-        public int hashCode() {
-            return runnable.hashCode();
-        }
-    }
-
-    private class TimedCallable<T> implements Callable<T> {
-
-        final Callable<T> task;
-        final long enqueueNanos;
-
-        TimedCallable(Callable<T> task) {
-            this.task = task;
-            this.enqueueNanos = MathUtils.nowInNano();
-        }
-
-        @Override
-        public T call() throws Exception {
-            long startNanos = MathUtils.nowInNano();
-            long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
-            taskPendingStats.registerSuccessfulEvent(pendingMicros);
-            try {
-                return task.call();
-            } finally {
-                long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
-                taskExecutionStats.registerSuccessfulEvent(executionMicros);
-            }
-        }
-    }
-
-    protected final boolean traceTaskExecution;
-    protected final OpStatsLogger taskExecutionStats;
-    protected final OpStatsLogger taskPendingStats;
-    protected final StatsLogger statsLogger;
-    // Gauges and their labels
-    private static final String pendingTasksGaugeLabel = "pending_tasks";
-    private final Gauge<Number> pendingTasksGauge;
-    private static final String completedTasksGaugeLabel = "completed_tasks";
-    protected final Gauge<Number> completedTasksGauge;
-    private static final String totalTasksGaugeLabel = "total_tasks";
-    protected final Gauge<Number> totalTasksGauge;
-
-    public MonitoredScheduledThreadPoolExecutor(int corePoolSize,
-                                                ThreadFactory threadFactory,
-                                                StatsLogger statsLogger,
-                                                boolean traceTaskExecution) {
-        super(corePoolSize, threadFactory);
-        this.traceTaskExecution = traceTaskExecution;
-        this.statsLogger = statsLogger;
-        this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time");
-        this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time");
-        this.pendingTasksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getQueue().size();
-            }
-        };
-        this.completedTasksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getCompletedTaskCount();
-            }
-        };
-        this.totalTasksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getTaskCount();
-            }
-        };
-
-        // outstanding tasks
-        this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge);
-        // completed tasks
-        this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge);
-        // total tasks
-        this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge);
-    }
-
-    private Runnable timedRunnable(Runnable r) {
-        return traceTaskExecution ? new TimedRunnable(r) : r;
-    }
-
-    private <T> Callable<T> timedCallable(Callable<T> task) {
-        return traceTaskExecution ? new TimedCallable<T>(task) : task;
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-        return super.submit(timedRunnable(task));
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return super.submit(timedRunnable(task), result);
-    }
-
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return super.submit(timedCallable(task));
-    }
-
-    @Override
-    protected void afterExecute(Runnable r, Throwable t) {
-        super.afterExecute(r, t);
-        Throwable hiddenThrowable = extractThrowable(r);
-        if (hiddenThrowable != null)
-            logAndHandle(hiddenThrowable, true);
-
-        // The executor re-throws exceptions thrown by the task to the uncaught exception handler
-        // so we don't need to pass the exception to the handler explicitly
-        if (null != t) {
-            logAndHandle(t, false);
-        }
-    }
-
-    /**
-     * The executor re-throws exceptions thrown by the task to the uncaught exception handler
-     * so we only need to do anything if uncaught exception handler has not been se
-     */
-    private void logAndHandle(Throwable t, boolean passToHandler) {
-        if (Thread.getDefaultUncaughtExceptionHandler() == null) {
-            LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
-        }
-        else {
-            LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
-            if (passToHandler) {
-                Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
-            }
-        }
-    }
-
-
-    /**
-     * Extract the exception (throwable) inside the ScheduledFutureTask
-     * @param runnable - The runable that was executed
-     * @return exception enclosed in the Runnable if any; null otherwise
-     */
-    private Throwable extractThrowable(Runnable runnable) {
-        // Check for exceptions wrapped by FutureTask.
-        // We do this by calling get(), which will cause it to throw any saved exception.
-        // Check for isDone to prevent blocking
-        if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) {
-            try {
-                ((Future<?>) runnable).get();
-            } catch (CancellationException e) {
-                LOG.debug("Task {} cancelled", runnable, e.getCause());
-            } catch (InterruptedException e) {
-                LOG.debug("Task {} was interrupted", runnable, e);
-            } catch (ExecutionException e) {
-                return e.getCause();
-            }
-        }
-
-        return null;
-    }
-
-    void unregisterGauges() {
-        this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge);
-        this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge);
-        this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
deleted file mode 100644
index ad1ba4e..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-import com.twitter.util.TimerTask;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import scala.Function0;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing
- * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i>
- * will be executed in order.
- * <p>
- * The scheduler is comprised of multiple {@link MonitoredScheduledThreadPoolExecutor}s. Each
- * {@link MonitoredScheduledThreadPoolExecutor} is a single thread executor. Normal task submissions will
- * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g
- * {@link OrderedScheduler#apply(Object, Function0)} will be submitted to a dedicated executor based on
- * the hash value of submit <i>key</i>.
- *
- * <h3>Metrics</h3>
- *
- * <h4>Per Executor Metrics</h4>
- *
- * Metrics about individual executors are exposed via {@link Builder#perExecutorStatsLogger}
- * under <i>`scope`/`name`-executor-`id`-0</i>. `name` is the scheduler name provided by {@link Builder#name}
- * while `id` is the index of this executor in the pool. And corresponding stats of future pool of
- * that executor are exposed under <i>`scope`/`name`-executor-`id`-0/futurepool</i>.
- * <p>
- * See {@link MonitoredScheduledThreadPoolExecutor} and {@link MonitoredFuturePool} for per executor metrics
- * exposed.
- *
- * <h4>Aggregated Metrics</h4>
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
- * executing.
- * <li>futurepool/task_pending_time: opstats. measuring the characteristics about the time that tasks spent
- * on waiting in future pool being executed.
- * <li>futurepool/task_execution_time: opstats. measuring the characteristics about the time that tasks spent
- * on executing.
- * <li>futurepool/task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on
- * submitting to future pool.
- * <li>futurepool/tasks_pending: gauge. how many tasks are pending in this future pool.
- * </ul>
- */
-public class OrderedScheduler implements ScheduledExecutorService {
-
-    /**
-     * Create a builder to build scheduler.
-     *
-     * @return scheduler builder
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder for {@link OrderedScheduler}.
-     */
-    public static class Builder {
-
-        private String name = "OrderedScheduler";
-        private int corePoolSize = -1;
-        private ThreadFactory threadFactory = null;
-        private boolean traceTaskExecution = false;
-        private long traceTaskExecutionWarnTimeUs = Long.MAX_VALUE;
-        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        private StatsLogger perExecutorStatsLogger = NullStatsLogger.INSTANCE;
-
-        /**
-         * Set the name of this scheduler. It would be used as part of stats scope and thread name.
-         *
-         * @param name
-         *          name of the scheduler.
-         * @return scheduler builder
-         */
-        public Builder name(String name) {
-            this.name = name;
-            return this;
-        }
-
-        /**
-         * Set the number of threads to be used in this scheduler.
-         *
-         * @param corePoolSize the number of threads to keep in the pool, even
-         *        if they are idle
-         * @return scheduler builder
-         */
-        public Builder corePoolSize(int corePoolSize) {
-            this.corePoolSize = corePoolSize;
-            return this;
-        }
-
-        /**
-         * Set the thread factory that the scheduler uses to create a new thread.
-         *
-         * @param threadFactory the factory to use when the executor
-         *        creates a new thread
-         * @return scheduler builder
-         */
-        public Builder threadFactory(ThreadFactory threadFactory) {
-            this.threadFactory = threadFactory;
-            return this;
-        }
-
-        /**
-         * Enable/Disable exposing task execution stats.
-         *
-         * @param trace
-         *          flag to enable/disable exposing task execution stats.
-         * @return scheduler builder
-         */
-        public Builder traceTaskExecution(boolean trace) {
-            this.traceTaskExecution = trace;
-            return this;
-        }
-
-        /**
-         * Enable/Disable logging slow tasks whose execution time is above <code>timeUs</code>.
-         *
-         * @param timeUs
-         *          slow task execution time threshold in us.
-         * @return scheduler builder.
-         */
-        public Builder traceTaskExecutionWarnTimeUs(long timeUs) {
-            this.traceTaskExecutionWarnTimeUs = timeUs;
-            return this;
-        }
-
-        /**
-         * Expose the aggregated stats over <code>statsLogger</code>.
-         *
-         * @param statsLogger
-         *          stats logger to receive aggregated stats.
-         * @return scheduler builder
-         */
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        /**
-         * Expose stats of individual executors over <code>perExecutorStatsLogger</code>.
-         * Each executor's stats will be exposed under a sub-scope `name`-executor-`id`-0.
-         * `name` is the scheduler name, while `id` is the index of the scheduler in the pool.
-         *
-         * @param perExecutorStatsLogger
-         *          stats logger to receive per executor stats.
-         * @return scheduler builder
-         */
-        public Builder perExecutorStatsLogger(StatsLogger perExecutorStatsLogger) {
-            this.perExecutorStatsLogger = perExecutorStatsLogger;
-            return this;
-        }
-
-        /**
-         * Build the ordered scheduler.
-         *
-         * @return ordered scheduler
-         */
-        public OrderedScheduler build() {
-            if (corePoolSize <= 0) {
-                corePoolSize = Runtime.getRuntime().availableProcessors();
-            }
-            if (null == threadFactory) {
-                threadFactory = Executors.defaultThreadFactory();
-            }
-
-            return new OrderedScheduler(
-                    name,
-                    corePoolSize,
-                    threadFactory,
-                    traceTaskExecution,
-                    traceTaskExecutionWarnTimeUs,
-                    statsLogger,
-                    perExecutorStatsLogger);
-        }
-
-    }
-
-    protected final String name;
-    protected final int corePoolSize;
-    protected final MonitoredScheduledThreadPoolExecutor[] executors;
-    protected final MonitoredFuturePool[] futurePools;
-    protected final Random random;
-
-    private OrderedScheduler(String name,
-                             int corePoolSize,
-                             ThreadFactory threadFactory,
-                             boolean traceTaskExecution,
-                             long traceTaskExecutionWarnTimeUs,
-                             StatsLogger statsLogger,
-                             StatsLogger perExecutorStatsLogger) {
-        this.name = name;
-        this.corePoolSize = corePoolSize;
-        this.executors = new MonitoredScheduledThreadPoolExecutor[corePoolSize];
-        this.futurePools = new MonitoredFuturePool[corePoolSize];
-        for (int i = 0; i < corePoolSize; i++) {
-            ThreadFactory tf = new ThreadFactoryBuilder()
-                    .setNameFormat(name + "-executor-" + i + "-%d")
-                    .setThreadFactory(threadFactory)
-                    .build();
-            StatsLogger broadcastStatsLogger =
-                    BroadCastStatsLogger.masterslave(perExecutorStatsLogger.scope("executor-" + i), statsLogger);
-            executors[i] = new MonitoredScheduledThreadPoolExecutor(
-                    1, tf, broadcastStatsLogger, traceTaskExecution);
-            futurePools[i] = new MonitoredFuturePool(
-                    new ExecutorServiceFuturePool(executors[i]),
-                    broadcastStatsLogger.scope("futurepool"),
-                    traceTaskExecution,
-                    traceTaskExecutionWarnTimeUs);
-        }
-        this.random = new Random(System.currentTimeMillis());
-    }
-
-    protected MonitoredScheduledThreadPoolExecutor chooseExecutor() {
-        return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)];
-    }
-
-    protected MonitoredScheduledThreadPoolExecutor chooseExecutor(Object key) {
-        return corePoolSize == 1 ? executors[0] :
-                executors[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)];
-    }
-
-    protected FuturePool chooseFuturePool(Object key) {
-        return corePoolSize == 1 ? futurePools[0] :
-                futurePools[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)];
-    }
-
-    protected FuturePool chooseFuturePool() {
-        return corePoolSize == 1 ? futurePools[0] : futurePools[random.nextInt(corePoolSize)];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        return chooseExecutor().schedule(command, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-        return chooseExecutor().schedule(callable, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
-                                                  long initialDelay, long period, TimeUnit unit) {
-        return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
-                                                     long initialDelay, long delay, TimeUnit unit) {
-        return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void shutdown() {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            // Unregister gauges
-            executor.unregisterGauges();
-            executor.shutdown();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Runnable> shutdownNow() {
-        List<Runnable> runnables = new ArrayList<Runnable>();
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            runnables.addAll(executor.shutdownNow());
-        }
-        return runnables;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isShutdown() {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            if (!executor.isShutdown()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isTerminated() {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            if (!executor.isTerminated()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit)
-            throws InterruptedException {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            if (!executor.awaitTermination(timeout, unit)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return chooseExecutor().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return chooseExecutor().submit(task, result);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Future<?> submit(Runnable task) {
-        return chooseExecutor().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-            throws InterruptedException {
-        return chooseExecutor().invokeAll(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-            throws InterruptedException {
-        return chooseExecutor().invokeAll(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-            throws InterruptedException, ExecutionException {
-        return chooseExecutor().invokeAny(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-        return chooseExecutor().invokeAny(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void execute(Runnable command) {
-        chooseExecutor().execute(command);
-    }
-
-    // Ordered Functions
-
-    /**
-     * Return a future pool used by <code>key</code>.
-     *
-     * @param key
-     *          key to order in the future pool
-     * @return future pool
-     */
-    public FuturePool getFuturePool(Object key) {
-        return chooseFuturePool(key);
-    }
-
-    /**
-     * Execute the <code>function</code> in the executor that assigned by <code>key</code>.
-     *
-     * @see com.twitter.util.Future
-     * @param key key of the <i>function</i> to run
-     * @param function function to run
-     * @return future representing the result of the <i>function</i>
-     */
-    public <T> com.twitter.util.Future<T> apply(Object key, Function0<T> function) {
-        return chooseFuturePool(key).apply(function);
-    }
-
-    /**
-     * Execute the <code>function</code> by the scheduler. It would be submitted to any executor randomly.
-     *
-     * @param function function to run
-     * @return future representing the result of the <i>function</i>
-     */
-    public <T> com.twitter.util.Future<T> apply(Function0<T> function) {
-        return chooseFuturePool().apply(function);
-    }
-
-    public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) {
-        return chooseExecutor(key).schedule(command, delay, unit);
-    }
-
-    public ScheduledFuture<?> scheduleAtFixedRate(Object key,
-                                                  Runnable command,
-                                                  long initialDelay,
-                                                  long period,
-                                                  TimeUnit unit) {
-        return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    public Future<?> submit(Object key, Runnable command) {
-        return chooseExecutor(key).submit(command);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java
deleted file mode 100644
index 15394dc..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-/**
- * A simple limiter interface which tracks acquire/release of permits, for
- * example for tracking outstanding writes.
- */
-public interface PermitLimiter {
-
-    public static PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() {
-        @Override
-        public boolean acquire() {
-            return true;
-        }
-        @Override
-        public void release(int permits) {
-        }
-
-        @Override
-        public void close() {
-
-        }
-    };
-
-    /**
-     * Acquire a permit.
-     *
-     * @return true if successfully acquire a permit, otherwise false.
-     */
-    boolean acquire();
-
-    /**
-     * Release a permit.
-     */
-    void release(int permits);
-
-    /**
-     * Close the resources created by the limiter
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java
deleted file mode 100644
index 24c7860..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-public interface PermitManager {
-
-    public static interface Permit {
-        static final Permit ALLOWED = new Permit() {
-            @Override
-            public boolean isAllowed() {
-                return true;
-            }
-        };
-        boolean isAllowed();
-    }
-
-    public static PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() {
-        @Override
-        public Permit acquirePermit() {
-            return Permit.ALLOWED;
-        }
-
-        @Override
-        public void releasePermit(Permit permit) {
-            // nop
-        }
-
-        @Override
-        public boolean allowObtainPermits() {
-            return true;
-        }
-
-        @Override
-        public boolean disallowObtainPermits(Permit permit) {
-            return false;
-        }
-
-        @Override
-        public void close() {
-            // nop
-        }
-
-    };
-
-    /**
-     * Obetain a permit from permit manager.
-     *
-     * @return permit.
-     */
-    Permit acquirePermit();
-
-    /**
-     * Release a given permit.
-     *
-     * @param permit
-     *          permit to release
-     */
-    void releasePermit(Permit permit);
-
-    /**
-     * Allow obtaining permits.
-     */
-    boolean allowObtainPermits();
-
-    /**
-     * Disallow obtaining permits. Disallow needs to be performed under the context
-     * of <i>permit</i>.
-     *
-     * @param permit
-     *          permit context to disallow
-     */
-    boolean disallowObtainPermits(Permit permit);
-
-    /**
-     * Release the resources
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java
deleted file mode 100644
index a467d26..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.util.Function0;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.runtime.BoxedUnit;
-
-/**
- * Acts like a future pool, but collects failed apply calls into a queue to be applied
- * in-order on close. This happens either in the close thread or after close is called,
- * in the last operation to complete execution.
- * Ops submitted after close will not be scheduled, so its important to ensure no more
- * ops will be applied once close has been called.
- */
-public class SafeQueueingFuturePool<T> {
-
-    static final Logger LOG = LoggerFactory.getLogger(SafeQueueingFuturePool.class);
-
-    private boolean closed;
-    private int outstanding;
-    private ConcurrentLinkedQueue<Function0<T>> queue;
-    private FuturePool orderedFuturePool;
-
-    public SafeQueueingFuturePool(FuturePool orderedFuturePool) {
-        this.closed = false;
-        this.outstanding = 0;
-        this.queue = new ConcurrentLinkedQueue<Function0<T>>();
-        this.orderedFuturePool = orderedFuturePool;
-    }
-
-    public synchronized Future<T> apply(final Function0<T> fn) {
-        Preconditions.checkNotNull(fn);
-        if (closed) {
-            return Future.exception(new RejectedExecutionException("Operation submitted to closed SafeQueueingFuturePool"));
-        }
-        ++outstanding;
-        queue.add(fn);
-        Future<T> result = orderedFuturePool.apply(new Function0<T>() {
-            @Override
-            public T apply() {
-                return queue.poll().apply();
-            }
-            @Override
-            public String toString() {
-                return fn.toString();
-            }
-        }).ensure(new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                if (decrOutstandingAndCheckDone()) {
-                    applyAll();
-                }
-                return null;
-            }
-        });
-        return result;
-    }
-
-    private synchronized boolean decrOutstandingAndCheckDone() {
-        return --outstanding == 0 && closed;
-    }
-
-    public void close() {
-        final boolean done;
-        synchronized (this) {
-            if (closed) {
-                return;
-            }
-            closed = true;
-            done = (outstanding == 0);
-        }
-        if (done) {
-            applyAll();
-        }
-    }
-
-    private void applyAll() {
-        if (!queue.isEmpty()) {
-            LOG.info("Applying {} items", queue.size());
-        }
-        while (!queue.isEmpty()) {
-            queue.poll().apply();
-        }
-    }
-
-    public synchronized int size() {
-        return queue.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java
deleted file mode 100644
index 66e382c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class SchedulerUtils {
-
-    static final Logger logger = LoggerFactory.getLogger(SchedulerUtils.class);
-
-    public static void shutdownScheduler(ExecutorService service, long timeout, TimeUnit timeUnit) {
-        if (null == service) {
-            return;
-        }
-        service.shutdown();
-        try {
-            service.awaitTermination(timeout, timeUnit);
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted when shutting down scheduler : ", e);
-        }
-        service.shutdownNow();
-    }
-
-    public static void shutdownScheduler(OrderedSafeExecutor service, long timeout, TimeUnit timeUnit) {
-        if (null == service) {
-            return;
-        }
-        service.shutdown();
-        try {
-            service.awaitTermination(timeout, timeUnit);
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted when shutting down scheduler : ", e);
-        }
-        service.forceShutdown(timeout, timeUnit);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java
deleted file mode 100644
index ab8de35..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-/**
- * Sequencer generating transaction id.
- */
-public interface Sequencer {
-
-    /**
-     * Return next transaction id generated by the sequencer.
-     *
-     * @return next transaction id generated by the sequencer.
-     */
-    long nextId();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index 767ddf6..3697b3f 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,8 +25,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.distributedlog.common.util.PermitLimiter;
 
 /**
  * Simple counter based {@link PermitLimiter}.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java
deleted file mode 100644
index 2f606e2..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-/**
- * The {@code Sizable} interface is to provide the capability of calculating size
- * of any objects.
- */
-public interface Sizable {
-    /**
-     * Calculate the size for this instance.
-     *
-     * @return size of the instance.
-     */
-    long size();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
index 69dfdbe..5bc5af2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.util;
 
 import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.common.util.Sequencer;
 
 /**
  * Time based sequencer. It generated non-decreasing transaction id using milliseconds.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
index 3a623dc..d90a7f8 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.util;
 
 import com.google.common.annotations.Beta;
-import com.twitter.util.Future;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Util class represents a transaction
@@ -44,7 +44,7 @@ public interface Transaction<OpResult> {
     }
 
     /**
-     * Listener on the result of an {@link org.apache.distributedlog.util.Transaction.Op}.
+     * Listener on the result of an {@link Transaction.Op}.
      *
      * @param <OpResult>
      */
@@ -77,12 +77,12 @@ public interface Transaction<OpResult> {
 
     /**
      * Execute the current transaction. If the transaction succeed, all operations will be
-     * committed (via {@link org.apache.distributedlog.util.Transaction.Op#commit(Object)}.
+     * committed (via {@link Transaction.Op#commit(Object)}.
      * Otherwise, all operations will be aborted (via {@link Op#abort(Throwable, Object)}).
      *
      * @return future representing the result of transaction execution.
      */
-    Future<Void> execute();
+    CompletableFuture<Void> execute();
 
     /**
      * Abort current transaction. If this is called and the transaction haven't been executed by