You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/03 03:03:24 UTC

[GitHub] merlimat closed pull request #1309: Refactored OrderedSafeExecutor and OrderedScheduler

merlimat closed pull request #1309: Refactored OrderedSafeExecutor and OrderedScheduler
URL: https://github.com/apache/bookkeeper/pull/1309
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index db8ccbff1..3efff7a21 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -31,6 +31,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -38,7 +39,6 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -165,7 +165,7 @@ public static void main(String[] args)
             eventLoop = new NioEventLoopGroup();
         }
 
-        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+        OrderedExecutor executor = OrderedExecutor.newBuilder()
                 .name("BenchBookieClientScheduler")
                 .numThreads(1)
                 .build();
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java
new file mode 100644
index 000000000..655d49de7
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implements {@link ExecutorService} and allows limiting the number of tasks to
+ * be scheduled in the thread's queue.
+ */
+public class BoundedExecutorService extends ForwardingExecutorService {
+    private final BlockingQueue<Runnable> queue;
+    private final ThreadPoolExecutor thread;
+    private final int maxTasksInQueue;
+
+    public BoundedExecutorService(ThreadPoolExecutor thread, int maxTasksInQueue) {
+        this.queue = thread.getQueue();
+        this.thread = thread;
+        this.maxTasksInQueue = maxTasksInQueue;
+    }
+
+    @Override
+    protected ExecutorService delegate() {
+        return this.thread;
+    }
+
+    private void checkQueue(int numberOfTasks) {
+        if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
+            throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
+        }
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        checkQueue(tasks.size());
+        return super.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        checkQueue(tasks.size());
+        return super.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        checkQueue(tasks.size());
+        return super.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        checkQueue(tasks.size());
+        return super.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        checkQueue(1);
+        super.execute(command);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        checkQueue(1);
+        return super.submit(task);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        checkQueue(1);
+        return super.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        checkQueue(1);
+        return super.submit(task, result);
+    }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
index 482ca1898..44c6f38d2 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
@@ -59,83 +59,83 @@ protected ListeningExecutorService delegate() {
 
     @Override
     public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.schedule(command, delay, unit);
     }
 
     @Override
     public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.schedule(callable, delay, unit);
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                             long initialDelay, long period, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit);
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                                long initialDelay, long delay, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit);
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Callable<T> task) {
-        this.checkQueue();
+        this.checkQueue(1);
         return super.submit(task);
     }
 
     @Override
     public ListenableFuture<?> submit(Runnable task) {
-        this.checkQueue();
+        this.checkQueue(1);
         return super.submit(task);
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAll(tasks);
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                          long timeout, TimeUnit unit) throws InterruptedException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAll(tasks, timeout, unit);
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAny(tasks);
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
                            TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAny(tasks, timeout, unit);
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Runnable task, T result) {
-        this.checkQueue();
+        this.checkQueue(1);
         return super.submit(task, result);
     }
 
     @Override
     public void execute(Runnable command) {
-        this.checkQueue();
+        this.checkQueue(1);
         super.execute(command);
     }
 
-    private void checkQueue() {
-        if (this.maxTasksInQueue > 0 && this.queue.size() >= this.maxTasksInQueue) {
-            throw new RejectedExecutionException("Queue at limit of " + this.maxTasksInQueue + " items");
+    private void checkQueue(int numberOfTasks) {
+        if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
+            throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
         }
     }
 
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
new file mode 100644
index 000000000..0f634d0bb
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * This class provides 2 things over the java {@link ExecutorService}.
+ *
+ * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
+ * This means that exceptions in scheduled tasks wont go unnoticed and will be
+ * logged.
+ *
+ * <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
+ * with the same key will always be executed in order, but tasks across
+ * different keys can be unordered. This retains parallelism while retaining the
+ * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
+ * achieved by hashing the key objects to threads by their {@link #hashCode()}
+ * method.
+ */
+@Slf4j
+public class OrderedExecutor implements ExecutorService {
+    public static final int NO_TASK_LIMIT = -1;
+    protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
+
+    final String name;
+    final ExecutorService threads[];
+    final long threadIds[];
+    final Random rand = new Random();
+    final OpStatsLogger taskExecutionStats;
+    final OpStatsLogger taskPendingStats;
+    final boolean traceTaskExecution;
+    final long warnTimeMicroSec;
+    final int maxTasksInQueue;
+
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * A builder class for an OrderedExecutor.
+     */
+    public static class Builder extends AbstractBuilder<OrderedExecutor> {
+
+        @Override
+        public OrderedExecutor build() {
+            if (null == threadFactory) {
+                threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor");
+            }
+            return new OrderedExecutor(name, numThreads, threadFactory, statsLogger,
+                                           traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+        }
+    }
+
+    /**
+     * Abstract builder class to build {@link OrderedScheduler}.
+     */
+    public abstract static class AbstractBuilder<T extends OrderedExecutor> {
+        protected String name = getClass().getSimpleName();
+        protected int numThreads = Runtime.getRuntime().availableProcessors();
+        protected ThreadFactory threadFactory = null;
+        protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        protected boolean traceTaskExecution = false;
+        protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+        protected int maxTasksInQueue = NO_TASK_LIMIT;
+
+        public AbstractBuilder<T> name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public AbstractBuilder<T> numThreads(int num) {
+            this.numThreads = num;
+            return this;
+        }
+
+        public AbstractBuilder<T> maxTasksInQueue(int num) {
+            this.maxTasksInQueue = num;
+            return this;
+        }
+
+        public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
+            this.threadFactory = threadFactory;
+            return this;
+        }
+
+        public AbstractBuilder<T> statsLogger(StatsLogger statsLogger) {
+            this.statsLogger = statsLogger;
+            return this;
+        }
+
+        public AbstractBuilder<T> traceTaskExecution(boolean enabled) {
+            this.traceTaskExecution = enabled;
+            return this;
+        }
+
+        public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
+            this.warnTimeMicroSec = warnTimeMicroSec;
+            return this;
+        }
+
+        @SuppressWarnings("unchecked")
+        public T build() {
+            if (null == threadFactory) {
+                threadFactory = new DefaultThreadFactory(name);
+            }
+            return (T) new OrderedExecutor(
+                name,
+                numThreads,
+                threadFactory,
+                statsLogger,
+                traceTaskExecution,
+                warnTimeMicroSec,
+                maxTasksInQueue);
+        }
+    }
+
+    /**
+     * Decorator class for a runnable that measure the execution time.
+     */
+    protected class TimedRunnable implements Runnable {
+        final Runnable runnable;
+        final long initNanos;
+
+        TimedRunnable(Runnable runnable) {
+            this.runnable = runnable;
+            this.initNanos = MathUtils.nowInNano();
+         }
+
+        @Override
+        public void run() {
+            taskPendingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(initNanos), TimeUnit.NANOSECONDS);
+            long startNanos = MathUtils.nowInNano();
+            this.runnable.run();
+            long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
+            taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
+            if (elapsedMicroSec >= warnTimeMicroSec) {
+                log.warn("Runnable {}:{} took too long {} micros to execute.", runnable, runnable.getClass(),
+                        elapsedMicroSec);
+            }
+        }
+    }
+
+    protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
+        return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
+    }
+
+    protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
+        return new BoundedExecutorService(executor, this.maxTasksInQueue);
+    }
+
+    /**
+     * Constructs Safe executor.
+     *
+     * @param numThreads
+     *            - number of threads
+     * @param baseName
+     *            - base name of executor threads
+     * @param threadFactory
+     *            - for constructing threads
+     * @param statsLogger
+     *            - for reporting executor stats
+     * @param traceTaskExecution
+     *            - should we stat task execution
+     * @param warnTimeMicroSec
+     *            - log long task exec warning after this interval
+     * @param maxTasksInQueue
+     *            - maximum items allowed in a thread queue. -1 for no limit
+     */
+    protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
+                                StatsLogger statsLogger, boolean traceTaskExecution,
+                                long warnTimeMicroSec, int maxTasksInQueue) {
+        checkArgument(numThreads > 0);
+        checkArgument(!StringUtils.isBlank(baseName));
+
+        this.maxTasksInQueue = maxTasksInQueue;
+        this.warnTimeMicroSec = warnTimeMicroSec;
+        name = baseName;
+        threads = new ExecutorService[numThreads];
+        threadIds = new long[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            ThreadPoolExecutor thread = createSingleThreadExecutor(
+                    new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
+                    .setThreadFactory(threadFactory).build());
+            threads[i] = getBoundedExecutor(thread);
+
+            final int idx = i;
+            try {
+                threads[idx].submit(() -> {
+                    threadIds[idx] = Thread.currentThread().getId();
+                }).get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Couldn't start thread " + i, e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException("Couldn't start thread " + i, e);
+            }
+
+            // Register gauges
+            statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return thread.getQueue().size();
+                }
+            });
+            statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return thread.getCompletedTaskCount();
+                }
+            });
+            statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return thread.getTaskCount();
+                }
+            });
+        }
+
+        // Stats
+        this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution");
+        this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued");
+        this.traceTaskExecution = traceTaskExecution;
+    }
+
+    /**
+     * Schedules a one time action to execute with an ordering guarantee on the key.
+     * @param orderingKey
+     * @param r
+     */
+    public void executeOrdered(Object orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(timedRunnable(r));
+    }
+
+    /**
+     * Schedules a one time action to execute with an ordering guarantee on the key.
+     * @param orderingKey
+     * @param r
+     */
+    public void executeOrdered(long orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(timedRunnable(r));
+    }
+
+    /**
+     * Schedules a one time action to execute with an ordering guarantee on the key.
+     * @param orderingKey
+     * @param r
+     */
+    public void executeOrdered(int orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(timedRunnable(r));
+    }
+
+    public <T> ListenableFuture<T> submitOrdered(long orderingKey, Callable<T> task) {
+        SettableFuture<T> future = SettableFuture.create();
+        executeOrdered(orderingKey, () -> {
+            try {
+                T result = task.call();
+                future.set(result);
+            } catch (Throwable t) {
+                future.setException(t);
+            }
+        });
+
+        return future;
+    }
+
+
+    public long getThreadID(long orderingKey) {
+        // skip hashcode generation in this special case
+        if (threadIds.length == 1) {
+            return threadIds[0];
+        }
+
+        return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
+    }
+
+    public ExecutorService chooseThread() {
+        // skip random # generation in this special case
+        if (threads.length == 1) {
+            return threads[0];
+        }
+
+        return threads[rand.nextInt(threads.length)];
+    }
+
+    public ExecutorService chooseThread(Object orderingKey) {
+        // skip hashcode generation in this special case
+        if (threads.length == 1) {
+            return threads[0];
+        }
+
+        return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
+    }
+
+    /**
+     * skip hashcode generation in this special case.
+     *
+     * @param orderingKey long ordering key
+     * @return the thread for executing this order key
+     */
+    public ExecutorService chooseThread(long orderingKey) {
+        if (threads.length == 1) {
+            return threads[0];
+        }
+
+        return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
+    }
+
+    private Runnable timedRunnable(Runnable r) {
+        if (traceTaskExecution) {
+            return new TimedRunnable(r);
+        } else {
+            return r;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return chooseThread().submit(task);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return chooseThread().submit(timedRunnable(task), result);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Future<?> submit(Runnable task) {
+        return chooseThread().submit(timedRunnable(task));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+        return chooseThread().invokeAll(tasks);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                         long timeout,
+                                         TimeUnit unit)
+        throws InterruptedException {
+        return chooseThread().invokeAll(tasks, timeout, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException, ExecutionException {
+        return chooseThread().invokeAny(tasks);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return chooseThread().invokeAny(tasks, timeout, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(Runnable command) {
+        chooseThread().execute(command);
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void shutdown() {
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].shutdown();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Runnable> shutdownNow() {
+        List<Runnable> runnables = new ArrayList<Runnable>();
+        for (ExecutorService executor : threads) {
+            runnables.addAll(executor.shutdownNow());
+        }
+        return runnables;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isShutdown() {
+        for (ExecutorService executor : threads) {
+            if (!executor.isShutdown()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        boolean ret = true;
+        for (int i = 0; i < threads.length; i++) {
+            ret = ret && threads[i].awaitTermination(timeout, unit);
+        }
+        return ret;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isTerminated() {
+        for (ExecutorService executor : threads) {
+            if (!executor.isTerminated()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Force threads shutdown (cancel active requests) after specified delay,
+     * to be used after shutdown() rejects new requests.
+     */
+    public void forceShutdown(long timeout, TimeUnit unit) {
+        for (int i = 0; i < threads.length; i++) {
+            try {
+                if (!threads[i].awaitTermination(timeout, unit)) {
+                    threads[i].shutdownNow();
+                }
+            } catch (InterruptedException exception) {
+                threads[i].shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index bf2a6fbc8..6f05832b9 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -17,32 +17,20 @@
  */
 package org.apache.bookkeeper.common.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
 
 /**
  * This class provides 2 things over the java {@link ScheduledExecutorService}.
@@ -58,19 +46,7 @@
  * achieved by hashing the key objects to threads by their {@link #hashCode()}
  * method.
  */
-public class OrderedScheduler implements ScheduledExecutorService {
-    public static final int NO_TASK_LIMIT = -1;
-    protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
-
-    final String name;
-    final ListeningScheduledExecutorService threads[];
-    final long threadIds[];
-    final Random rand = new Random();
-    final OpStatsLogger taskExecutionStats;
-    final OpStatsLogger taskPendingStats;
-    final boolean traceTaskExecution;
-    final long warnTimeMicroSec;
-    final int maxTasksInQueue;
+public class OrderedScheduler extends OrderedExecutor implements ScheduledExecutorService {
 
     /**
      * Create a builder to build ordered scheduler.
@@ -84,61 +60,13 @@ public static SchedulerBuilder newSchedulerBuilder() {
     /**
      * Builder to build ordered scheduler.
      */
-    public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
-
-    /**
-     * Abstract builder class to build {@link OrderedScheduler}.
-     */
-    public abstract static class AbstractBuilder<T extends OrderedScheduler> {
-        protected String name = getClass().getSimpleName();
-        protected int numThreads = Runtime.getRuntime().availableProcessors();
-        protected ThreadFactory threadFactory = null;
-        protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        protected boolean traceTaskExecution = false;
-        protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
-        protected int maxTasksInQueue = NO_TASK_LIMIT;
-
-        public AbstractBuilder<T> name(String name) {
-            this.name = name;
-            return this;
-        }
-
-        public AbstractBuilder<T> numThreads(int num) {
-            this.numThreads = num;
-            return this;
-        }
-
-        public AbstractBuilder<T> maxTasksInQueue(int num) {
-            this.maxTasksInQueue = num;
-            return this;
-        }
-
-        public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
-            this.threadFactory = threadFactory;
-            return this;
-        }
-
-        public AbstractBuilder<T> statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        public AbstractBuilder<T> traceTaskExecution(boolean enabled) {
-            this.traceTaskExecution = enabled;
-            return this;
-        }
-
-        public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
-            this.warnTimeMicroSec = warnTimeMicroSec;
-            return this;
-        }
-
-        @SuppressWarnings("unchecked")
-        public T build() {
+    public static class SchedulerBuilder extends OrderedExecutor.AbstractBuilder<OrderedScheduler> {
+        @Override
+        public OrderedScheduler build() {
             if (null == threadFactory) {
                 threadFactory = new DefaultThreadFactory(name);
             }
-            return (T) new OrderedScheduler(
+            return new OrderedScheduler(
                 name,
                 numThreads,
                 threadFactory,
@@ -147,32 +75,6 @@ public T build() {
                 warnTimeMicroSec,
                 maxTasksInQueue);
         }
-
-    }
-
-    private class TimedRunnable implements SafeRunnable {
-        final SafeRunnable runnable;
-        final long initNanos;
-
-        TimedRunnable(SafeRunnable runnable) {
-            this.runnable = runnable;
-            this.initNanos = MathUtils.nowInNano();
-         }
-
-        @Override
-        public void safeRun() {
-            taskPendingStats.registerSuccessfulEvent(
-                    MathUtils.elapsedNanos(initNanos),
-                    TimeUnit.NANOSECONDS);
-            long startNanos = MathUtils.nowInNano();
-            this.runnable.safeRun();
-            long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
-            taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
-            if (elapsedMicroSec >= warnTimeMicroSec) {
-                LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
-                        runnable, runnable.getClass(), elapsedMicroSec);
-            }
-        }
     }
 
     /**
@@ -191,172 +93,40 @@ public void safeRun() {
      * @param warnTimeMicroSec
      *            - log long task exec warning after this interval
      */
-    protected OrderedScheduler(String baseName,
+    private OrderedScheduler(String baseName,
                                int numThreads,
                                ThreadFactory threadFactory,
                                StatsLogger statsLogger,
                                boolean traceTaskExecution,
                                long warnTimeMicroSec,
                                int maxTasksInQueue) {
-        checkArgument(numThreads > 0);
-        checkArgument(!StringUtils.isBlank(baseName));
-
-        this.maxTasksInQueue = maxTasksInQueue;
-        this.warnTimeMicroSec = warnTimeMicroSec;
-        name = baseName;
-        threads = new ListeningScheduledExecutorService[numThreads];
-        threadIds = new long[numThreads];
-        for (int i = 0; i < numThreads; i++) {
-            final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
-                    new ThreadFactoryBuilder()
-                        .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
-                        .setThreadFactory(threadFactory)
-                        .build());
-            threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue);
-
-            final int idx = i;
-            try {
-                threads[idx].submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        threadIds[idx] = Thread.currentThread().getId();
-                    }
-                }).get();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException("Couldn't start thread " + i, e);
-            } catch (ExecutionException e) {
-                throw new RuntimeException("Couldn't start thread " + i, e);
-            }
-
-            // Register gauges
-            statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getQueue().size();
-                }
-            });
-            statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
+        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+    }
 
-                @Override
-                public Number getSample() {
-                    return thread.getCompletedTaskCount();
-                }
-            });
-            statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
 
-                @Override
-                public Number getSample() {
-                    return thread.getTaskCount();
-                }
-            });
-        }
+    @Override
+    protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
+        return new ScheduledThreadPoolExecutor(1, factory);
+    }
 
-        // Stats
-        this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution");
-        this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued");
-        this.traceTaskExecution = traceTaskExecution;
+    @Override
+    protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
+        return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue);
     }
 
+    @Override
     public ListeningScheduledExecutorService chooseThread() {
-        // skip random # generation in this special case
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[rand.nextInt(threads.length)];
+        return (ListeningScheduledExecutorService) super.chooseThread();
     }
 
+    @Override
     public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
-        // skip hashcode generation in this special case
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
+        return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
     }
 
-    /**
-     * skip hashcode generation in this special case.
-     *
-     * @param orderingKey long ordering key
-     * @return the thread for executing this order key
-     */
+    @Override
     public ListeningScheduledExecutorService chooseThread(long orderingKey) {
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
-    }
-
-    private SafeRunnable timedRunnable(SafeRunnable r) {
-        if (traceTaskExecution) {
-            return new TimedRunnable(r);
-        } else {
-            return r;
-        }
-    }
-
-    /**
-     * schedules a one time action to execute.
-     */
-    public void submit(SafeRunnable r) {
-        chooseThread().submit(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
-     *
-     * @param orderingKey order key to submit the task
-     * @param r task to run
-     * @return listenable future on the completion of the task
-     */
-    public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
-        return chooseThread(orderingKey).submit(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
-     *
-     * @param orderingKey order key to submit the task
-     * @param r task to run
-     */
-    public void executeOrdered(Object orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
+        return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
     }
 
     /**
@@ -472,91 +242,6 @@ public void submitOrdered(int orderingKey, SafeRunnable r) {
         return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
 
-    protected long getThreadID(long orderingKey) {
-        // skip hashcode generation in this special case
-        if (threadIds.length == 1) {
-            return threadIds[0];
-        }
-
-        return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void shutdown() {
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].shutdown();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Runnable> shutdownNow() {
-        List<Runnable> runnables = new ArrayList<Runnable>();
-        for (ScheduledExecutorService executor : threads) {
-            runnables.addAll(executor.shutdownNow());
-        }
-        return runnables;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isShutdown() {
-        for (ScheduledExecutorService executor : threads) {
-            if (!executor.isShutdown()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-        boolean ret = true;
-        for (int i = 0; i < threads.length; i++) {
-            ret = ret && threads[i].awaitTermination(timeout, unit);
-        }
-        return ret;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isTerminated() {
-        for (ScheduledExecutorService executor : threads) {
-            if (!executor.isTerminated()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Force threads shutdown (cancel active requests) after specified delay,
-     * to be used after shutdown() rejects new requests.
-     */
-    public void forceShutdown(long timeout, TimeUnit unit) {
-        for (int i = 0; i < threads.length; i++) {
-            try {
-                if (!threads[i].awaitTermination(timeout, unit)) {
-                    threads[i].shutdownNow();
-                }
-            } catch (InterruptedException exception) {
-                threads[i].shutdownNow();
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
 
     //
     // Methods for implementing {@link ScheduledExecutorService}
@@ -596,74 +281,4 @@ public void forceShutdown(long timeout, TimeUnit unit) {
         return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return chooseThread().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return chooseThread().submit(task, result);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Future<?> submit(Runnable task) {
-        return chooseThread().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-        return chooseThread().invokeAll(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
-                                         long timeout,
-                                         TimeUnit unit)
-        throws InterruptedException {
-        return chooseThread().invokeAll(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-        return chooseThread().invokeAny(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        return chooseThread().invokeAny(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void execute(Runnable command) {
-        chooseThread().execute(command);
-    }
-
 }
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
index 89214e302..6a52ef597 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
@@ -94,7 +94,7 @@ private Retries() {
                 scheduler,
                 null);
         } else {
-            scheduler.submitOrdered(key, () -> {
+            scheduler.executeOrdered(key, () -> {
                 execute(
                     future,
                     backoffs.iterator(),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 546a8bafa..d5f6c8dcd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -88,6 +88,7 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.UpdateLedgerOp;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
@@ -112,7 +113,6 @@
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.LedgerIdFormatter;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.Tool;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -814,7 +814,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
                 } else {
                     // Use BookieClient to target a specific bookie
                     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-                    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+                    OrderedExecutor executor = OrderedExecutor.newBuilder()
                         .numThreads(1)
                         .name("BookieClientScheduler")
                         .build();
@@ -1022,6 +1022,7 @@ long getLedgerId() {
             return ledgerId;
         }
 
+        @Override
         public void operationComplete(int rc, LedgerMetadata result) {
             if (rc != 0) {
                 setException(BKException.create(rc));
@@ -2489,6 +2490,7 @@ String getDescription() {
             return "Convert bookie indexes from InterleavedStorage to DbLedgerStorage format";
         }
 
+        @Override
         String getUsage() {
             return CMD_CONVERT_TO_DB_STORAGE;
         }
@@ -2584,6 +2586,7 @@ String getDescription() {
             return "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format";
         }
 
+        @Override
         String getUsage() {
             return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
         }
@@ -2699,6 +2702,7 @@ String getDescription() {
             return "Rebuild DbLedgerStorage locations index by scanning the entry logs";
         }
 
+        @Override
         String getUsage() {
             return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
         }
@@ -3116,6 +3120,7 @@ public void remove() {
         };
 
         return new Iterable<SortedMap<Long, Long>>() {
+            @Override
             public Iterator<SortedMap<Long, Long>> iterator() {
                 return iterator;
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index f0c524998..98626657f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -57,6 +57,8 @@
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
@@ -79,7 +81,6 @@
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.configuration.ConfigurationException;
@@ -134,8 +135,8 @@
     final BookieClient bookieClient;
     final BookieWatcher bookieWatcher;
 
-    final OrderedSafeExecutor mainWorkerPool;
-    final ScheduledExecutorService scheduler;
+    final OrderedExecutor mainWorkerPool;
+    final OrderedScheduler scheduler;
     final HashedWheelTimer requestTimer;
     final boolean ownTimer;
     final FeatureProvider featureProvider;
@@ -421,9 +422,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
 
         // initialize resources
-        this.scheduler = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("BookKeeperClientScheduler"));
-        this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
+        this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
+        this.mainWorkerPool = OrderedExecutor.newBuilder()
                 .name("BookKeeperClientWorker")
                 .numThreads(conf.getNumWorkerThreads())
                 .statsLogger(statsLogger)
@@ -672,12 +672,12 @@ BookieWatcher getBookieWatcher() {
     }
 
     @VisibleForTesting
-    OrderedSafeExecutor getMainWorkerPool() {
+    OrderedExecutor getMainWorkerPool() {
         return mainWorkerPool;
     }
 
     @VisibleForTesting
-    ScheduledExecutorService getScheduler() {
+    OrderedScheduler getScheduler() {
         return scheduler;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index c77843a6c..96c8998db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -116,7 +116,7 @@ public String toString() {
                 }
             };
             try {
-                scheduledFuture = lh.bk.getMainWorkerPool().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+                scheduledFuture = lh.bk.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
                         explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS);
             } catch (RejectedExecutionException re) {
                 LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
index 117e35989..45ec4255a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
@@ -30,7 +30,7 @@
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.bookkeeper.versioning.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +39,7 @@
  * Encapsulates asynchronous ledger delete operation.
  *
  */
-class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
+class LedgerDeleteOp extends OrderedGenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 5c2ff8c6d..3f599e28a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -46,7 +46,7 @@
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -416,7 +416,7 @@ public void operationComplete(int rc, Void result) {
                 // try again, the previous success (with which this has
                 // conflicted) will have updated the stat other operations
                 // such as (addEnsemble) would update it too.
-                lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+                lh.rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(
                                 lh.bk.mainWorkerPool, lh.getId()) {
                             @Override
                             public void safeOperationComplete(int rc,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 8db566739..a9a85bc03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -83,7 +83,7 @@
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.collections4.IteratorUtils;
 import org.slf4j.Logger;
@@ -176,6 +176,7 @@
         this.bookieFailureHistory = CacheBuilder.newBuilder()
             .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
             .build(new CacheLoader<BookieSocketAddress, Long>() {
+            @Override
             public Long load(BookieSocketAddress key) {
                 return -1L;
             }
@@ -200,10 +201,12 @@ public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
         lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
         bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
                                           new Gauge<Integer>() {
-                                              public Integer getDefaultValue() {
+                                              @Override
+                                            public Integer getDefaultValue() {
                                                   return 0;
                                               }
-                                              public Integer getSample() {
+                                              @Override
+                                            public Integer getSample() {
                                                   return pendingAddOps.size();
                                               }
                                           });
@@ -240,6 +243,7 @@ protected void initializeExplicitLacFlushPolicy() {
      *
      * @return the id of the ledger
      */
+    @Override
     public long getId() {
         return ledgerId;
     }
@@ -344,6 +348,7 @@ synchronized long addToLength(long delta) {
      *
      * @return the length of the ledger in bytes
      */
+    @Override
     public synchronized long getLength() {
         return this.length;
     }
@@ -453,7 +458,7 @@ void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc)
      * @param rc
      */
     void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
-        bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
+        bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
                 final long prevLastEntryId;
@@ -512,7 +517,7 @@ public void safeRun() {
                               + metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength());
                 }
 
-                final class CloseCb extends OrderedSafeGenericCallback<Void> {
+                final class CloseCb extends OrderedGenericCallback<Void> {
                     CloseCb() {
                         super(bk.getMainWorkerPool(), ledgerId);
                     }
@@ -520,7 +525,7 @@ public void safeRun() {
                     @Override
                     public void safeOperationComplete(final int rc, Void result) {
                         if (rc == BKException.Code.MetadataVersionException) {
-                            rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
+                            rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
                                                                                           ledgerId) {
                                 @Override
                                 public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
@@ -830,7 +835,7 @@ public LedgerEntry readLastEntry()
                                                               boolean isRecoveryRead) {
         PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
         if (!bk.isClosed()) {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } else {
             op.future().completeExceptionally(BKException.create(ClientClosedException));
         }
@@ -1099,7 +1104,7 @@ public String toString() {
         }
 
         try {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
             op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
                     LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
@@ -1689,7 +1694,7 @@ public String toString() {
      * reformed ensemble. On MetadataVersionException, will reread latest
      * ledgerMetadata and act upon.
      */
-    private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> {
+    private final class ChangeEnsembleCb extends OrderedGenericCallback<Void> {
         private final EnsembleInfo ensembleInfo;
         private final int curBlockAddCompletions;
         private final int ensembleChangeIdx;
@@ -1749,7 +1754,7 @@ public String toString() {
      * Callback which is reading the ledgerMetadata present in zk. This will try
      * to resolve the version conflicts.
      */
-    private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> {
+    private final class ReReadLedgerMetadataCb extends OrderedGenericCallback<LedgerMetadata> {
         private final int rc;
         private final EnsembleInfo ensembleInfo;
         private final int curBlockAddCompletions;
@@ -2002,11 +2007,11 @@ void recover(GenericCallback<Void> finalCb,
             return;
         }
 
-        writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
+        writeLedgerConfig(new OrderedGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
             @Override
             public void safeOperationComplete(final int rc, Void result) {
                 if (rc == BKException.Code.MetadataVersionException) {
-                    rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
+                    rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
                                                                                   ledgerId) {
                         @Override
                         public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index afd56cf2e..0eaf0b5d3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -238,7 +238,7 @@ public String toString() {
         }
 
         try {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
             op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
                               LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 16b577b65..33cfaf266 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -38,7 +38,7 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,7 +183,7 @@ public void operationComplete(int rc, LedgerMetadata metadata) {
         }
 
         if (doRecovery) {
-            lh.recover(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
+            lh.recover(new OrderedGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
                 @Override
                 public void safeOperationComplete(int rc, Void result) {
                     if (rc == BKException.Code.OK) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index dd5058ef0..eb9576633 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -143,7 +143,7 @@ boolean maybeTimeout() {
 
     void timeoutQuorumWait() {
         try {
-            lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, new SafeRunnable() {
+            lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     if (completed) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 484e9c734..a41207d53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -505,7 +505,7 @@ PendingReadOp parallelRead(boolean enabled) {
     }
 
     public void submit() {
-        lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this);
+        lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
     }
 
     void initiate() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 9d62f72d7..eef0e70ea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -163,7 +163,7 @@ public void onChanged(long lid, LedgerMetadata newMetadata) {
         }
         if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
             try {
-                bk.getMainWorkerPool().submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
+                bk.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata));
             } catch (RejectedExecutionException ree) {
                 LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
                         ledgerId, newMetadata);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 285a06aa7..c160b204a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -49,6 +49,7 @@
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -63,7 +64,6 @@
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +77,7 @@
     // This is global state that should be across all BookieClients
     AtomicLong totalBytesOutstanding = new AtomicLong();
 
-    OrderedSafeExecutor executor;
+    OrderedExecutor executor;
     ScheduledExecutorService scheduler;
     ScheduledFuture<?> timeoutFuture;
 
@@ -97,7 +97,7 @@
     private final long bookieErrorThresholdPerInterval;
 
     public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
-                        OrderedSafeExecutor executor, ScheduledExecutorService scheduler,
+                        OrderedExecutor executor, ScheduledExecutorService scheduler,
                         StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
@@ -198,7 +198,7 @@ public void writeLac(final BookieSocketAddress addr, final long ledgerId, final
         client.obtain((rc, pcbc) -> {
             if (rc != BKException.Code.OK) {
                 try {
-                    executor.submitOrdered(ledgerId, safeRun(() -> {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
                         cb.writeLacComplete(rc, ledgerId, addr, ctx);
                     }));
                 } catch (RejectedExecutionException re) {
@@ -219,7 +219,7 @@ private void completeAdd(final int rc,
                              final WriteCallback cb,
                              final Object ctx) {
         try {
-            executor.submitOrdered(ledgerId, new SafeRunnable() {
+            executor.executeOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
@@ -266,7 +266,7 @@ private void completeRead(final int rc,
                               final ReadEntryCallback cb,
                               final Object ctx) {
         try {
-            executor.submitOrdered(ledgerId, new SafeRunnable() {
+            executor.executeOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
@@ -362,7 +362,7 @@ public void readLac(final BookieSocketAddress addr, final long ledgerId, final R
         client.obtain((rc, pcbc) -> {
             if (rc != BKException.Code.OK) {
                 try {
-                    executor.submitOrdered(ledgerId, safeRun(() -> {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
                         cb.readLacComplete(rc, ledgerId, null, null, ctx);
                     }));
                 } catch (RejectedExecutionException re) {
@@ -526,7 +526,7 @@ public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress a
         byte hello[] = "hello".getBytes(UTF_8);
         long ledger = Long.parseLong(args[2]);
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
-        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+        OrderedExecutor executor = OrderedExecutor.newBuilder()
                 .name("BookieClientWorker")
                 .numThreads(1)
                 .build();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 94a43a956..7e65ced11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -59,7 +59,7 @@
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.Counter;
@@ -68,7 +68,6 @@
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,12 +92,12 @@
     /**
      * The threadpool used to execute all read entry requests issued to this server.
      */
-    private final OrderedSafeExecutor readThreadPool;
+    private final OrderedExecutor readThreadPool;
 
     /**
      * The threadpool used to execute all add entry requests issued to this server.
      */
-    private final OrderedSafeExecutor writeThreadPool;
+    private final OrderedExecutor writeThreadPool;
 
     /**
      * TLS management.
@@ -109,12 +108,12 @@
      * The threadpool used to execute all long poll requests issued to this server
      * after they are done waiting.
      */
-    private final OrderedSafeExecutor longPollThreadPool;
+    private final OrderedExecutor longPollThreadPool;
 
     /**
      * The threadpool used to execute high priority requests.
      */
-    private final OrderedSafeExecutor highPriorityThreadPool;
+    private final OrderedExecutor highPriorityThreadPool;
 
     /**
      * The Timer used to time out requests for long polling.
@@ -162,11 +161,11 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
         this.longPollThreadPool = createExecutor(
                 this.serverCfg.getNumLongPollWorkerThreads(),
                 "BookieLongPollThread-" + serverCfg.getBookiePort(),
-                OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+                OrderedExecutor.NO_TASK_LIMIT, statsLogger);
         this.highPriorityThreadPool = createExecutor(
                 this.serverCfg.getNumHighPriorityWorkerThreads(),
                 "BookieHighPriorityThread-" + serverCfg.getBookiePort(),
-                OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+                OrderedExecutor.NO_TASK_LIMIT, statsLogger);
         this.requestTimer = new HashedWheelTimer(
             new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
             this.serverCfg.getRequestTimerTickDurationMs(),
@@ -208,7 +207,7 @@ public void close() {
         shutdownExecutor(highPriorityThreadPool);
     }
 
-    private OrderedSafeExecutor createExecutor(
+    private OrderedExecutor createExecutor(
             int numThreads,
             String nameFormat,
             int maxTasksInQueue,
@@ -216,7 +215,7 @@ private OrderedSafeExecutor createExecutor(
         if (numThreads <= 0) {
             return null;
         } else {
-            return OrderedSafeExecutor.newBuilder()
+            return OrderedExecutor.newBuilder()
                     .numThreads(numThreads)
                     .name(nameFormat)
                     .traceTaskExecution(serverCfg.getEnableTaskExecutionStats())
@@ -226,7 +225,7 @@ private OrderedSafeExecutor createExecutor(
         }
     }
 
-    private void shutdownExecutor(OrderedSafeExecutor service) {
+    private void shutdownExecutor(OrderedExecutor service) {
         if (null != service) {
             service.shutdown();
         }
@@ -310,7 +309,7 @@ private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final
         if (null == writeThreadPool) {
             writeLac.run();
         } else {
-            writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), writeLac);
+            writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), writeLac);
         }
     }
 
@@ -319,14 +318,14 @@ private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final C
         if (null == readThreadPool) {
             readLac.run();
         } else {
-            readThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), readLac);
+            readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), readLac);
         }
     }
 
     private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
         WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
 
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (RequestUtils.isHighPriority(r)) {
             threadPool = highPriorityThreadPool;
         } else {
@@ -337,7 +336,7 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann
             write.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getAddRequest().getLedgerId(), write);
+                threadPool.executeOrdered(r.getAddRequest().getLedgerId(), write);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests",
@@ -361,7 +360,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan
         ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
 
         final ReadEntryProcessorV3 read;
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
             ExecutorService lpThread = null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c);
 
@@ -387,7 +386,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan
             read.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+                threadPool.executeOrdered(r.getReadRequest().getLedgerId(), read);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests",
@@ -464,7 +463,7 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch
 
         // If it's a high priority add (usually as part of recovery process), we want to make sure it gets
         // executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (r.isHighPriority()) {
             threadPool = highPriorityThreadPool;
         } else {
@@ -475,7 +474,7 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch
             write.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getLedgerId(), write);
+                threadPool.executeOrdered(r.getLedgerId(), write);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", r.ledgerId,
@@ -494,7 +493,7 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe
         // If it's a high priority read (fencing or as part of recovery process), we want to make sure it
         // gets executed as fast as possible, so bypass the normal readThreadPool
         // and execute in highPriorityThreadPool
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (r.isHighPriority() || r.isFencing()) {
             threadPool = highPriorityThreadPool;
         } else {
@@ -505,7 +504,7 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe
             read.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getLedgerId(), read);
+                threadPool.executeOrdered(r.getLedgerId(), read);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", r.ledgerId,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 439320b38..20fcdda09 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -85,6 +85,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -120,7 +121,6 @@
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
@@ -149,7 +149,7 @@
 
     final BookieSocketAddress addr;
     final EventLoopGroup eventLoopGroup;
-    final OrderedSafeExecutor executor;
+    final OrderedExecutor executor;
     final long addEntryTimeoutNanos;
     final long readEntryTimeoutNanos;
     final int maxFrameSize;
@@ -213,13 +213,13 @@
     private final ExtensionRegistry extRegistry;
     private final SecurityHandlerFactory shFactory;
 
-    public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
+    public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
                                   BookieSocketAddress addr) throws SecurityException {
         this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null,
                 null);
     }
 
-    public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
+    public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
                                   BookieSocketAddress addr,
                                   ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry) throws SecurityException {
@@ -227,7 +227,7 @@ public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup event
                 authProviderFactory, extRegistry, null);
     }
 
-    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
+    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
                                   EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
@@ -236,7 +236,7 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor exec
                 authProviderFactory, extRegistry, pcbcPool, null);
     }
 
-    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
+    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
                                   EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
@@ -1111,7 +1111,7 @@ private void readV2Response(final BookieProtocol.Response response) {
             response.release();
         } else {
             long orderingKey = completionValue.ledgerId;
-            executor.submitOrdered(orderingKey,
+            executor.executeOrdered(orderingKey,
                     ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId,
                                                   status, response));
         }
@@ -1226,7 +1226,7 @@ private void readV3Response(final Response response) {
             }
         } else {
             long orderingKey = completionValue.ledgerId;
-            executor.submitOrdered(orderingKey, new SafeRunnable() {
+            executor.executeOrdered(orderingKey, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     completionValue.handleV3Response(response);
@@ -1400,7 +1400,7 @@ public void setOutstanding() {
         }
 
         protected void errorOutAndRunCallback(final Runnable callback) {
-            executor.submitOrdered(ledgerId,
+            executor.executeOrdered(ledgerId,
                     new SafeRunnable() {
                         @Override
                         public void safeRun() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
new file mode 100644
index 000000000..73150ad01
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic callback implementation which will run the
+ * callback in the thread which matches the ordering key.
+ */
+public abstract class OrderedGenericCallback<T> implements GenericCallback<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(OrderedGenericCallback.class);
+
+    private final OrderedExecutor executor;
+    private final long orderingKey;
+
+    /**
+     * @param executor The executor on which to run the callback
+     * @param orderingKey Key used to decide which thread the callback
+     *                    should run on.
+     */
+    public OrderedGenericCallback(OrderedExecutor executor, long orderingKey) {
+        this.executor = executor;
+        this.orderingKey = orderingKey;
+    }
+
+    @Override
+    public final void operationComplete(final int rc, final T result) {
+        // during closing, callbacks that are error out might try to submit to
+        // the scheduler again. if the submission will go to same thread, we
+        // don't need to submit to executor again. this is also an optimization for
+        // callback submission
+        if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
+            safeOperationComplete(rc, result);
+        } else {
+            try {
+                executor.executeOrdered(orderingKey, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        safeOperationComplete(rc, result);
+                    }
+                    @Override
+                    public String toString() {
+                        return String.format("Callback(key=%s, name=%s)",
+                                             orderingKey,
+                                             OrderedGenericCallback.this);
+                    }
+                });
+            } catch (RejectedExecutionException re) {
+                LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
+            }
+        }
+    }
+
+    public abstract void safeOperationComplete(int rc, T result);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
deleted file mode 100644
index f3af7df76..000000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.util;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides 2 things over the java {@link ScheduledExecutorService}.
- *
- * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
- * This means that exceptions in scheduled tasks wont go unnoticed and will be
- * logged.
- *
- * <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
- * with the same key will always be executed in order, but tasks across
- * different keys can be unordered. This retains parallelism while retaining the
- * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
- * achieved by hashing the key objects to threads by their {@link #hashCode()}
- * method.
- *
- * <p>Note: deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.OrderedScheduler}.
- */
-public class OrderedSafeExecutor extends org.apache.bookkeeper.common.util.OrderedScheduler {
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * A builder class for an OrderedSafeExecutor.
-     */
-    public static class Builder extends AbstractBuilder<OrderedSafeExecutor> {
-
-        public OrderedSafeExecutor build() {
-            if (null == threadFactory) {
-                threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor");
-            }
-            return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger,
-                                           traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
-        }
-    }
-
-    /**
-     * Constructs Safe executor.
-     *
-     * @param numThreads
-     *            - number of threads
-     * @param baseName
-     *            - base name of executor threads
-     * @param threadFactory
-     *            - for constructing threads
-     * @param statsLogger
-     *            - for reporting executor stats
-     * @param traceTaskExecution
-     *            - should we stat task execution
-     * @param warnTimeMicroSec
-     *            - log long task exec warning after this interval
-     * @param maxTasksInQueue
-     *            - maximum items allowed in a thread queue. -1 for no limit
-     */
-    private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
-                                StatsLogger statsLogger, boolean traceTaskExecution,
-                                long warnTimeMicroSec, int maxTasksInQueue) {
-        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
-    }
-
-    /**
-     * Schedules a one time action to execute.
-     */
-    public void submit(SafeRunnable r) {
-        super.submit(r);
-    }
-
-    /**
-     * Schedules a one time action to execute with an ordering guarantee on the key.
-     * @param orderingKey
-     * @param r
-     */
-    public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
-        return super.submitOrdered(orderingKey, r);
-    }
-
-    /**
-     * Schedules a one time action to execute with an ordering guarantee on the key.
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
-        super.submitOrdered(orderingKey, r);
-    }
-
-    /**
-     * Schedules a one time action to execute with an ordering guarantee on the key.
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
-        super.submitOrdered(orderingKey, r);
-    }
-
-    /**
-     * Creates and executes a one-shot action that becomes enabled after the given delay.
-     *
-     * @param command - the SafeRunnable to execute
-     * @param delay - the time from now to delay execution
-     * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method
-     *      will return null upon completion
-     */
-    public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
-        return super.schedule(command, delay, unit);
-    }
-
-    /**
-     * Creates and executes a one-shot action that becomes enabled after the given delay.
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param delay - the time from now to delay execution
-     * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method
-     *      will return null upon completion
-     */
-    public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
-        return super.scheduleOrdered(orderingKey, command, delay, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period.
-     *
-     * <p>For more details check scheduleAtFixedRate in interface ScheduledExecutorService
-     *
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param period - the period between successive executions
-     * @param unit - the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get()
-     *      method will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
-        return super.scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period.
-     *
-     * <p>For more details check scheduleAtFixedRate in interface ScheduledExecutorService
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param period - the period between successive executions
-     * @param unit - the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
-            long period, TimeUnit unit) {
-        return super.scheduleAtFixedRateOrdered(orderingKey, command, initialDelay, period, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
-     * with the given delay between the termination of one execution and the commencement of the next.
-     *
-     * <p>For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
-     *
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param delay - the delay between the termination of one execution and the commencement of the next
-     * @param unit - the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
-            TimeUnit unit) {
-        return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
-     * with the given delay between the termination of one execution and the commencement of the next.
-     *
-     * <p>For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param delay - the delay between the termination of one execution and the commencement of the next
-     * @param unit - the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
-            long delay, TimeUnit unit) {
-        return super.scheduleWithFixedDelayOrdered(orderingKey, command, initialDelay, delay, unit);
-    }
-
-    /**
-     * Generic callback implementation which will run the
-     * callback in the thread which matches the ordering key.
-     */
-    public abstract static class OrderedSafeGenericCallback<T>
-            implements GenericCallback<T> {
-        private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
-
-        private final OrderedSafeExecutor executor;
-        private final long orderingKey;
-
-        /**
-         * @param executor The executor on which to run the callback
-         * @param orderingKey Key used to decide which thread the callback
-         *                    should run on.
-         */
-        public OrderedSafeGenericCallback(OrderedSafeExecutor executor, long orderingKey) {
-            this.executor = executor;
-            this.orderingKey = orderingKey;
-        }
-
-        @Override
-        public final void operationComplete(final int rc, final T result) {
-            // during closing, callbacks that are error out might try to submit to
-            // the scheduler again. if the submission will go to same thread, we
-            // don't need to submit to executor again. this is also an optimization for
-            // callback submission
-            if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
-                safeOperationComplete(rc, result);
-            } else {
-                try {
-                    executor.submitOrdered(orderingKey, new SafeRunnable() {
-                        @Override
-                        public void safeRun() {
-                            safeOperationComplete(rc, result);
-                        }
-                        @Override
-                        public String toString() {
-                            return String.format("Callback(key=%s, name=%s)",
-                                                 orderingKey,
-                                                 OrderedSafeGenericCallback.this);
-                        }
-                    });
-                } catch (RejectedExecutionException re) {
-                    LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
-                }
-            }
-        }
-
-        public abstract void safeOperationComplete(int rc, T result);
-    }
-}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index e686742c5..5efd7bde7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -29,8 +29,10 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -41,15 +43,16 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
@@ -61,7 +64,6 @@
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.invocation.InvocationOnMock;
@@ -76,8 +78,8 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(MockBookKeeperTestCase.class);
 
-    protected ScheduledExecutorService scheduler;
-    protected OrderedSafeExecutor executor;
+    protected OrderedScheduler scheduler;
+    protected OrderedExecutor executor;
     protected BookKeeper bk;
     protected BookieClient bookieClient;
     protected LedgerManager ledgerManager;
@@ -128,8 +130,8 @@ public void setup() throws Exception {
         mockLedgerData = new ConcurrentHashMap<>();
         mockNextLedgerId = new AtomicLong(1);
         fencedLedgers = new ConcurrentSkipListSet<>();
-        scheduler = new ScheduledThreadPoolExecutor(4);
-        executor = OrderedSafeExecutor.newBuilder().build();
+        scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(4).name("bk-test").build();
+        executor = OrderedExecutor.newBuilder().build();
         bookieWatcher = mock(BookieWatcher.class);
 
         bookieClient = mock(BookieClient.class);
@@ -312,7 +314,7 @@ private void setupReadLedgerMetadata() {
         doAnswer(invocation -> {
             Object[] args = invocation.getArguments();
             Long ledgerId = (Long) args[0];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[1];
                 LedgerMetadata ledgerMetadata = mockLedgerMetadataRegistry.get(ledgerId);
                 if (ledgerMetadata == null) {
@@ -330,7 +332,7 @@ private void setupRemoveLedgerMetadata() {
         doAnswer(invocation -> {
             Object[] args = invocation.getArguments();
             Long ledgerId = (Long) args[0];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
                 if (mockLedgerMetadataRegistry.remove(ledgerId) != null) {
                     cb.operationComplete(BKException.Code.OK, null);
@@ -368,7 +370,7 @@ private void setupCreateLedgerMetadata() {
             Object[] args = invocation.getArguments();
             BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
             Long ledgerId = (Long) args[0];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1];
                 mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(ledgerMetadata));
                 cb.operationComplete(BKException.Code.OK, null);
@@ -384,7 +386,7 @@ private void setupWriteLedgerMetadata() {
             Long ledgerId = (Long) args[0];
             LedgerMetadata metadata = (LedgerMetadata) args[1];
             BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(metadata));
                 cb.operationComplete(BKException.Code.OK, null);
             });
@@ -403,7 +405,7 @@ protected void setupBookieClientReadEntry() {
                 (BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
             boolean fenced = (((Integer) args[5]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING;
 
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 DigestManager macManager = null;
                 try {
                     macManager = getDigestType(ledgerId);
@@ -478,7 +480,7 @@ protected void setupBookieClientAddEntry() {
             boolean isRecoveryAdd =
                 ((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD;
 
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 byte[] entry;
                 try {
                     entry = extractEntryPayload(ledgerId, entryId, toSend);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index bf6c2305f..a91dffae3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -34,6 +34,7 @@
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -41,7 +42,6 @@
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +56,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class);
     DigestType digestType;
     public EventLoopGroup eventLoopGroup;
-    public OrderedSafeExecutor executor;
+    public OrderedExecutor executor;
     private ScheduledExecutorService scheduler;
 
     public TestGetBookieInfoTimeout() {
@@ -69,7 +69,7 @@ public void setUp() throws Exception {
         super.setUp();
         eventLoopGroup = new NioEventLoopGroup();
 
-        executor = OrderedSafeExecutor.newBuilder()
+        executor = OrderedExecutor.newBuilder()
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 59811d119..a07acef81 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -38,6 +38,7 @@
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.auth.TestAuth;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -49,7 +50,6 @@
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.Test;
 
 /**
@@ -64,7 +64,7 @@
     ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
     ClientAuthProvider.Factory authProvider;
     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
+    OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
             .build();
 
     public TestBackwardCompatCMS42() throws Exception {
@@ -191,7 +191,7 @@ CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
         final CountDownLatch connected = new CountDownLatch(1);
 
         CompatClient42(ClientConfiguration conf,
-                       OrderedSafeExecutor executor,
+                       OrderedExecutor executor,
                        EventLoopGroup eventLoopGroup,
                        BookieSocketAddress addr,
                        ClientAuthProvider.Factory authProviderFactory,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 90788b1e7..d96dce104 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -39,6 +39,7 @@
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -46,7 +47,6 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -79,7 +79,7 @@ public TestPerChannelBookieClient() throws Exception {
     @Test
     public void testConnectCloseRace() throws Exception {
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        OrderedExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 1000; i++) {
@@ -98,8 +98,8 @@ public void operationComplete(int rc, PerChannelBookieClient client) {
         executor.shutdown();
     }
 
-    public OrderedSafeExecutor getOrderedSafeExecutor() {
-        return OrderedSafeExecutor.newBuilder()
+    public OrderedExecutor getOrderedSafeExecutor() {
+        return OrderedExecutor.newBuilder()
             .name("PCBC")
             .numThreads(1)
             .traceTaskExecution(true)
@@ -122,7 +122,7 @@ public void operationComplete(int rc, PerChannelBookieClient pcbc) {
             }
         };
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        OrderedExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 100; i++) {
@@ -154,7 +154,7 @@ public void operationComplete(int rc, PerChannelBookieClient client) {
         };
         final int iterations = 100000;
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        OrderedExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
         final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
@@ -250,7 +250,7 @@ public ByteBuf readEntry(long ledgerId, long entryId)
         bs.add(startBookie(conf, delayBookie));
 
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        final OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        final OrderedExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
         final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
@@ -268,7 +268,7 @@ public void readEntryComplete(int rc, long ledgerId, long entryId,
             @Override
             public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
                 if (rc != BKException.Code.OK) {
-                    executor.submitOrdered(1, new SafeRunnable() {
+                    executor.executeOrdered(1, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             cb.readEntryComplete(rc, 1, 1, null, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7ecce0ba9..29c2e3ce0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -40,6 +40,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -54,7 +55,6 @@
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -68,7 +68,7 @@
     public int port = 13645;
 
     public EventLoopGroup eventLoopGroup;
-    public OrderedSafeExecutor executor;
+    public OrderedExecutor executor;
     private ScheduledExecutorService scheduler;
 
     @Before
@@ -85,7 +85,7 @@ public void setUp() throws Exception {
         bs = new BookieServer(conf);
         bs.start();
         eventLoopGroup = new NioEventLoopGroup();
-        executor = OrderedSafeExecutor.newBuilder()
+        executor = OrderedExecutor.newBuilder()
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 1886c2aeb..c5f7f070d 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -123,6 +123,9 @@
   <Match>
     <Class name="~org\.apache\.bookkeeper\.stats\.generated.*" />
   </Match>
+  <Match>
+    <Class name="~org\.apache\.bookkeeper\.common\.generated.*" />
+  </Match>
 
   <!-- modules under stream/ -->
 
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java
new file mode 100644
index 000000000..66ce59bc3
--- /dev/null
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Microbenchmarks for different executors providers.
+ */
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Threads(16)
+@Fork(1)
+@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+public class OrderedExecutorBenchmark {
+
+    private static Map<String, Supplier<ExecutorService>> providers = ImmutableMap.of( //
+            "JDK-ThreadPool", () -> Executors.newFixedThreadPool(1),
+            "OrderedExecutor", () -> OrderedExecutor.newBuilder().numThreads(1).build(), //
+            "OrderedScheduler", () -> OrderedScheduler.newSchedulerBuilder().numThreads(1).build());
+
+    @State(Scope.Benchmark)
+    public static class TestState {
+        @Param({ "JDK-ThreadPool", "OrderedExecutor", "OrderedScheduler" })
+        private String executorName;
+
+        private ExecutorService executor;
+
+        @Setup(Level.Trial)
+        public void setup() {
+            executor = providers.get(executorName).get();
+        }
+
+        @TearDown(Level.Trial)
+        public void teardown() {
+            executor.shutdown();
+        }
+    }
+
+    @Benchmark
+    public void submitAndWait(TestState s) throws Exception {
+        s.executor.submit(() -> {
+        }).get();
+    }
+}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 6adcbc420..3c8dfb7aa 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -391,6 +391,7 @@ public String getStreamName() {
         return readInternal(1, 0, TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION);
     }
 
+    @Override
     public synchronized CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries) {
         return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
     }
@@ -477,7 +478,7 @@ public synchronized void scheduleBackgroundRead() {
         long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
             scheduleDelayStopwatch.reset().start();
-            scheduler.submitOrdered(streamName, this);
+            scheduler.executeOrdered(streamName, this);
         }
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index a0dd6ad12..c9bca4440 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -394,7 +394,7 @@ private void orderedSubmit(SafeRunnable runnable) {
             }
         }
         try {
-            scheduler.submitOrdered(streamName, runnable);
+            scheduler.executeOrdered(streamName, runnable);
         } catch (RejectedExecutionException ree) {
             logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
                     streamName, ree);
@@ -449,7 +449,7 @@ synchronized boolean isClosed() {
         // use runnable here instead of CloseableRunnable,
         // because we need this to be executed
         try {
-            scheduler.submitOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
+            scheduler.executeOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
         } catch (RejectedExecutionException ree) {
             logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
                     streamName, ree);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index 5a9ee1a6c..d78b4550f 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -212,7 +212,7 @@ protected void submitTask(Object key, SafeRunnable r) {
             if (closed) {
                 return;
             }
-            scheduler.submitOrdered(key, r);
+            scheduler.executeOrdered(key, r);
         } finally {
             closeLock.readLock().unlock();
         }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 898c11382..813e9742a 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -691,7 +691,7 @@ private void processReadRequests() {
 
         long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
-            scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
+            scheduler.executeOrdered(getSegment().getLogSegmentId(), this);
         }
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 7948084fa..107c5e317 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -135,6 +135,7 @@ private synchronized void checkLockState() throws LockingException {
      * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
      * list--is executed synchronously, but the lock wait itself doesn't block.
      */
+    @Override
     public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
         if (null != lockAcquireFuture) {
             return FutureUtils.exception(
@@ -145,7 +146,7 @@ private synchronized void checkLockState() throws LockingException {
             if (null == throwable || !(throwable instanceof CancellationException)) {
                 return;
             }
-            lockStateExecutor.submitOrdered(lockPath, () -> asyncClose());
+            lockStateExecutor.executeOrdered(lockPath, () -> asyncClose());
         });
         final Stopwatch stopwatch = Stopwatch.createStarted();
         promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
@@ -165,7 +166,7 @@ public void onFailure(Throwable cause) {
             }
         });
         this.lockAcquireFuture = promise;
-        lockStateExecutor.submitOrdered(
+        lockStateExecutor.executeOrdered(
             lockPath, () -> doAsyncAcquireWithSemaphore(promise, lockTimeout));
         return promise;
     }
@@ -293,6 +294,7 @@ public void onExpired() {
      *
      * @throws LockingException     if the lock attempt fails
      */
+    @Override
     public synchronized void checkOwnershipAndReacquire() throws LockingException {
         if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
             throw new LockingException(lockPath, "check ownership before acquiring");
@@ -315,6 +317,7 @@ public synchronized void checkOwnershipAndReacquire() throws LockingException {
      *
      * @throws LockingException     if the lock attempt fails
      */
+    @Override
     public synchronized void checkOwnership() throws LockingException {
         if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
             throw new LockingException(lockPath, "check ownership before acquiring");
@@ -426,7 +429,7 @@ private void complete() {
                 FutureUtils.complete(closePromise, null);
             }
         }, lockStateExecutor.chooseThread(lockPath));
-        lockStateExecutor.submitOrdered(
+        lockStateExecutor.executeOrdered(
             lockPath, () -> closeWaiter(lockWaiter, closeWaiterFuture));
         return closePromise;
     }
@@ -434,7 +437,7 @@ private void complete() {
     void internalReacquireLock(final AtomicInteger numRetries,
                                final long lockTimeout,
                                final CompletableFuture<ZKDistributedLock> reacquirePromise) {
-        lockStateExecutor.submitOrdered(
+        lockStateExecutor.executeOrdered(
             lockPath, () -> doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise));
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index 3d7933631..84c516c34 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -168,6 +168,7 @@ public static String getLockIdFromPath(String path) {
     }
 
     static final Comparator<String> MEMBER_COMPARATOR = new Comparator<String>() {
+        @Override
         public int compare(String o1, String o2) {
             int l1 = parseMemberID(o1);
             int l2 = parseMemberID(o2);
@@ -374,6 +375,7 @@ State getLockState() {
         return lockId;
     }
 
+    @Override
     public boolean isLockExpired() {
         return lockState.isExpiredOrClosing();
     }
@@ -392,7 +394,7 @@ public boolean isLockHeld() {
      *          function to execute a lock action
      */
     protected void executeLockAction(final int lockEpoch, final LockAction func) {
-        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+        lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 if (getEpoch() == lockEpoch) {
@@ -431,7 +433,7 @@ public void safeRun() {
      */
     protected <T> void executeLockAction(final int lockEpoch,
                                          final LockAction func, final CompletableFuture<T> promise) {
-        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+        lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 int currentEpoch = getEpoch();
@@ -554,7 +556,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta
                 @Override
                 public void processResult(final int rc, String path, Object ctx,
                                           final List<String> children, Stat stat) {
-                    lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+                    lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             if (!lockState.inState(State.INIT)) {
@@ -648,7 +650,7 @@ public void onFailure(Throwable cause) {
     private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner,
                                           final CompletableFuture<String> result) {
         if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) {
-            lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+            lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     result.complete(currentOwner.getLeft());
@@ -878,7 +880,7 @@ synchronized LockWaiter waitForTry(Stopwatch stopwatch, CompletableFuture<LockWa
         // Use lock executor here rather than lock action, because we want this opertaion to be applied
         // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no
         // risk of an ABA problem where we delete and recreate a node and then delete it again here.
-        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+        lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 acquireFuture.completeExceptionally(cause);
@@ -980,7 +982,7 @@ private void deleteLockNode(final CompletableFuture<Void> promise) {
         zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(final int rc, final String path, Object ctx) {
-                lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+                lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         if (KeeperException.Code.OK.intValue() == rc) {
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index 517b0dfcd..437bb6fab 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -65,6 +65,7 @@
     private BookKeeperClient bkc;
     private ZooKeeperClient zkc;
 
+    @Override
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -94,6 +95,7 @@ public void setup() throws Exception {
                 .build();
     }
 
+    @Override
     @After
     public void teardown() throws Exception {
         if (null != bkc) {
@@ -138,15 +140,8 @@ private ReadAheadEntryReader createEntryReader(String streamName,
 
     private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
         final CompletableFuture<Void> promise = new CompletableFuture<Void>();
-        scheduler.submitOrdered(streamName, () -> {
+        scheduler.executeOrdered(streamName, () -> {
             FutureUtils.complete(promise, null);
-            // the following line is needed for oraclejdk9 to avoid following exception
-            // ```
-            // incompatible types: inference variable T has incompatible bounds
-            // upper bounds: java.lang.Object
-            // lower bounds: void
-            // ```
-            return (Void) null;
         });
         Utils.ioResult(promise);
     }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index 901d2a04c..8a9d9c2f8 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -779,7 +779,7 @@ public void onExpired() {
         // expire session
         ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
         // submit a runnable to lock state executor to ensure any state changes happened when session expired
-        lockStateExecutor.submitOrdered(lockPath, () -> expiredLatch.countDown());
+        lockStateExecutor.executeOrdered(lockPath, () -> expiredLatch.countDown());
         expiredLatch.await();
         // no watcher was registered if never acquired lock successfully
         assertEquals(State.INIT, lock.getLockState());
@@ -1216,7 +1216,7 @@ private void testLockWhenSiblingUseSameLockId(long timeout, final boolean isUnlo
         } else {
             ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
             final CountDownLatch latch = new CountDownLatch(1);
-            lockStateExecutor.submitOrdered(lockPath, () -> latch.countDown());
+            lockStateExecutor.executeOrdered(lockPath, () -> latch.countDown());
             latch.await();
             children = getLockWaiters(zkc, lockPath);
             assertEquals(0, children.size());
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
index 6155cba5e..39e00c06c 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
@@ -79,7 +79,7 @@ public void close() {
 
     private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
         CompletableFuture<T> future = FutureUtils.createFuture();
-        scheduler.submitOrdered(scId, () -> {
+        scheduler.executeOrdered(scId, () -> {
             future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
         });
         return future;
diff --git a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
index e462d33d6..eba29f14a 100644
--- a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
+++ b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
@@ -50,7 +50,7 @@ public void testGuavaShadedPath() throws Exception {
 
     @Test
     public void testBookKeeperCommon() throws Exception {
-        Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor");
+        Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor");
         assertTrue(true);
     }
 
diff --git a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
index 45be333af..c4519dfdf 100644
--- a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
+++ b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
@@ -85,13 +85,13 @@ public void testZooKeeperShadedPath() throws Exception {
 
     @Test(expected = ClassNotFoundException.class)
     public void testBookKeeperCommon() throws Exception {
-        Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor");
+        Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor");
         assertTrue(true);
     }
 
     @Test
     public void testBookKeeperCommonShade() throws Exception {
-        Class.forName("dlshade.org.apache.bookkeeper.util.OrderedSafeExecutor");
+        Class.forName("dlshade.org.apache.bookkeeper.common.util.OrderedExecutor");
         assertTrue(true);
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services