You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/04/03 03:03:20 UTC

[bookkeeper] branch master updated: Refactored OrderedSafeExecutor and OrderedScheduler

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 46171e6  Refactored OrderedSafeExecutor and OrderedScheduler
46171e6 is described below

commit 46171e67e526702487641438144f28b7eb1aa07b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 2 20:03:12 2018 -0700

    Refactored OrderedSafeExecutor and OrderedScheduler
    
    As outlined in https://lists.apache.org/thread.html/102383ea42f473f36720637e41af0ee83fc38d9f992736e0d1a7f985%3Cdev.bookkeeper.apache.org%3E
    
    right now `OrderedSafeExecutor` is implemented on top of `OrderedScheduler`.  There are few problems with this approach that are causing impact on performance:
    
     1. `OrderedScheduler` is a `ScheduledExecutorService` which uses a priority queue for tasks. The priority queue has a single mutex for both publishers/consumers on the queue
     2. There are many objects created for each task submission, due to listenable future decorators
    
    Since in all cases in critical write/read path we don't need delay task execution or futures, we should try to have a light weight execution for that.
    
    ### Modifications
    
     * Inverted the hierarchy between `OrderedSafeExecutor` and `OrderedScheduler`. Now the base class is `OrderedSafeExecutor` and the other extends from it, since it provides additional methods.
     * Moved `OrderedSafeExecutor` in `bookkeeper-common` since `OrderedScheduler` was already there.
     * Moved `OrderedSafeGenericCallback` in its own file, since it needs to be in `bookkeeper-server` module at this point.
     * Changed some method names from `submitOrdered()` into `executeOrdered()` to be consistent with JDK name (`submit()` returns a future while `execute()` returns void).
     * Changed `BookKeeper` instance of `scheduler` into `OrderedScheduler` so that the few cases which were using the `mainWorkerPool` could be easily converted to use the scheduler instead.
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Andrey Yegorov <None>, Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1309 from merlimat/refactor-ordered-executor and squashes the following commits:
    
    9af6eadc2 [Matteo Merli] Fixed findbugs configuration to ignore genereted JMH code
    c85ad21e5 [Matteo Merli] Use ImmutableMap
    22eba18c8 [Matteo Merli] Added microbenchmark
    2c67d0e56 [Matteo Merli] Fix for TestMaxSizeWorkersQueue
    54a72ba1b [Matteo Merli] Fix for TestReadAheadEntryReader
    019d5202d [Matteo Merli] More fixes for submitOrdered
    b8175f311 [Matteo Merli] Fixed array initialization type
    5d68d431d [Matteo Merli] Fixed class names in integration tests
    7c529dde7 [Matteo Merli] Fixed submitOrdered in DL code
    c91259b7d [Matteo Merli] Removed "final" modifier on checkQueue
    c5f70d5b4 [Matteo Merli] Merge remote-tracking branch 'apache/master' into refactor-ordered-executor
    957c98309 [Matteo Merli] Correctly handle number of tasks in queue check
    e45f78c09 [Matteo Merli] Renamed into OrderedExecutor and OrderedGenericCallback
    926868429 [Matteo Merli] Refactored OrderedSafeExecutor and OrderedScheduler
---
 .../apache/bookkeeper/benchmark/BenchBookie.java   |   4 +-
 .../common/util/BoundedExecutorService.java        | 109 +++++
 .../util/BoundedScheduledExecutorService.java      |  30 +-
 ...{OrderedScheduler.java => OrderedExecutor.java} | 457 +++++++--------------
 .../bookkeeper/common/util/OrderedScheduler.java   | 431 ++-----------------
 .../org/apache/bookkeeper/common/util/Retries.java |   2 +-
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   9 +-
 .../org/apache/bookkeeper/client/BookKeeper.java   |  16 +-
 .../bookkeeper/client/ExplicitLacFlushPolicy.java  |   2 +-
 .../apache/bookkeeper/client/LedgerDeleteOp.java   |   4 +-
 .../client/LedgerFragmentReplicator.java           |   4 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java |  29 +-
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |   2 +-
 .../org/apache/bookkeeper/client/LedgerOpenOp.java |   4 +-
 .../org/apache/bookkeeper/client/PendingAddOp.java |   2 +-
 .../apache/bookkeeper/client/PendingReadOp.java    |   2 +-
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |   2 +-
 .../org/apache/bookkeeper/proto/BookieClient.java  |  16 +-
 .../bookkeeper/proto/BookieRequestProcessor.java   |  41 +-
 .../bookkeeper/proto/PerChannelBookieClient.java   |  18 +-
 .../bookkeeper/util/OrderedGenericCallback.java    |  77 ++++
 .../bookkeeper/util/OrderedSafeExecutor.java       | 280 -------------
 .../bookkeeper/client/MockBookKeeperTestCase.java  |  28 +-
 .../client/TestGetBookieInfoTimeout.java           |   6 +-
 .../bookkeeper/proto/TestBackwardCompatCMS42.java  |   6 +-
 .../proto/TestPerChannelBookieClient.java          |  16 +-
 .../apache/bookkeeper/test/BookieClientTest.java   |   6 +-
 .../main/resources/bookkeeper/findbugsExclude.xml  |   3 +
 .../common/OrderedExecutorBenchmark.java           |  82 ++++
 .../apache/distributedlog/BKAsyncLogReader.java    |   3 +-
 .../distributedlog/ReadAheadEntryReader.java       |   4 +-
 .../impl/ZKLogSegmentMetadataStore.java            |   2 +-
 .../impl/logsegment/BKLogSegmentEntryReader.java   |   2 +-
 .../distributedlog/lock/ZKDistributedLock.java     |  11 +-
 .../apache/distributedlog/lock/ZKSessionLock.java  |  14 +-
 .../distributedlog/TestReadAheadEntryReader.java   |  11 +-
 .../distributedlog/lock/TestZKSessionLock.java     |   4 +-
 .../impl/sc/FailRequestStorageContainer.java       |   2 +-
 .../shaded/BookKeeperServerShadedJarTest.java      |   2 +-
 .../shaded/DistributedLogCoreShadedJarTest.java    |   4 +-
 40 files changed, 610 insertions(+), 1137 deletions(-)

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index db8ccbf..3efff7a 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -38,7 +39,6 @@ import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -165,7 +165,7 @@ public class BenchBookie {
             eventLoop = new NioEventLoopGroup();
         }
 
-        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+        OrderedExecutor executor = OrderedExecutor.newBuilder()
                 .name("BenchBookieClientScheduler")
                 .numThreads(1)
                 .build();
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java
new file mode 100644
index 0000000..655d49d
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implements {@link ExecutorService} and allows limiting the number of tasks to
+ * be scheduled in the thread's queue.
+ */
+public class BoundedExecutorService extends ForwardingExecutorService {
+    private final BlockingQueue<Runnable> queue;
+    private final ThreadPoolExecutor thread;
+    private final int maxTasksInQueue;
+
+    public BoundedExecutorService(ThreadPoolExecutor thread, int maxTasksInQueue) {
+        this.queue = thread.getQueue();
+        this.thread = thread;
+        this.maxTasksInQueue = maxTasksInQueue;
+    }
+
+    @Override
+    protected ExecutorService delegate() {
+        return this.thread;
+    }
+
+    private void checkQueue(int numberOfTasks) {
+        if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
+            throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
+        }
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        checkQueue(tasks.size());
+        return super.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        checkQueue(tasks.size());
+        return super.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        checkQueue(tasks.size());
+        return super.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        checkQueue(tasks.size());
+        return super.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        checkQueue(1);
+        super.execute(command);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        checkQueue(1);
+        return super.submit(task);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        checkQueue(1);
+        return super.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        checkQueue(1);
+        return super.submit(task, result);
+    }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
index 482ca18..44c6f38 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
@@ -59,83 +59,83 @@ public class BoundedScheduledExecutorService extends ForwardingListeningExecutor
 
     @Override
     public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.schedule(command, delay, unit);
     }
 
     @Override
     public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.schedule(callable, delay, unit);
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                             long initialDelay, long period, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit);
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                                long initialDelay, long delay, TimeUnit unit) {
-        this.checkQueue();
+        this.checkQueue(1);
         return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit);
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Callable<T> task) {
-        this.checkQueue();
+        this.checkQueue(1);
         return super.submit(task);
     }
 
     @Override
     public ListenableFuture<?> submit(Runnable task) {
-        this.checkQueue();
+        this.checkQueue(1);
         return super.submit(task);
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAll(tasks);
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                          long timeout, TimeUnit unit) throws InterruptedException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAll(tasks, timeout, unit);
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAny(tasks);
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
                            TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        this.checkQueue();
+        this.checkQueue(tasks.size());
         return super.invokeAny(tasks, timeout, unit);
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Runnable task, T result) {
-        this.checkQueue();
+        this.checkQueue(1);
         return super.submit(task, result);
     }
 
     @Override
     public void execute(Runnable command) {
-        this.checkQueue();
+        this.checkQueue(1);
         super.execute(command);
     }
 
-    private void checkQueue() {
-        if (this.maxTasksInQueue > 0 && this.queue.size() >= this.maxTasksInQueue) {
-            throw new RejectedExecutionException("Queue at limit of " + this.maxTasksInQueue + " items");
+    private void checkQueue(int numberOfTasks) {
+        if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
+            throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
         }
     }
 
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
similarity index 54%
copy from bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
index bf2a6fb..0f634d0 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +20,7 @@ package org.apache.bookkeeper.common.util;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -31,13 +31,16 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -45,7 +48,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang.StringUtils;
 
 /**
- * This class provides 2 things over the java {@link ScheduledExecutorService}.
+ * This class provides 2 things over the java {@link ExecutorService}.
  *
  * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
  * This means that exceptions in scheduled tasks wont go unnoticed and will be
@@ -58,12 +61,13 @@ import org.apache.commons.lang.StringUtils;
  * achieved by hashing the key objects to threads by their {@link #hashCode()}
  * method.
  */
-public class OrderedScheduler implements ScheduledExecutorService {
+@Slf4j
+public class OrderedExecutor implements ExecutorService {
     public static final int NO_TASK_LIMIT = -1;
     protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
 
     final String name;
-    final ListeningScheduledExecutorService threads[];
+    final ExecutorService threads[];
     final long threadIds[];
     final Random rand = new Random();
     final OpStatsLogger taskExecutionStats;
@@ -72,24 +76,30 @@ public class OrderedScheduler implements ScheduledExecutorService {
     final long warnTimeMicroSec;
     final int maxTasksInQueue;
 
-    /**
-     * Create a builder to build ordered scheduler.
-     *
-     * @return builder to build ordered scheduler.
-     */
-    public static SchedulerBuilder newSchedulerBuilder() {
-        return new SchedulerBuilder();
+
+    public static Builder newBuilder() {
+        return new Builder();
     }
 
     /**
-     * Builder to build ordered scheduler.
+     * A builder class for an OrderedExecutor.
      */
-    public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
+    public static class Builder extends AbstractBuilder<OrderedExecutor> {
+
+        @Override
+        public OrderedExecutor build() {
+            if (null == threadFactory) {
+                threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor");
+            }
+            return new OrderedExecutor(name, numThreads, threadFactory, statsLogger,
+                                           traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+        }
+    }
 
     /**
      * Abstract builder class to build {@link OrderedScheduler}.
      */
-    public abstract static class AbstractBuilder<T extends OrderedScheduler> {
+    public abstract static class AbstractBuilder<T extends OrderedExecutor> {
         protected String name = getClass().getSimpleName();
         protected int numThreads = Runtime.getRuntime().availableProcessors();
         protected ThreadFactory threadFactory = null;
@@ -138,7 +148,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
             if (null == threadFactory) {
                 threadFactory = new DefaultThreadFactory(name);
             }
-            return (T) new OrderedScheduler(
+            return (T) new OrderedExecutor(
                 name,
                 numThreads,
                 threadFactory,
@@ -147,34 +157,42 @@ public class OrderedScheduler implements ScheduledExecutorService {
                 warnTimeMicroSec,
                 maxTasksInQueue);
         }
-
     }
 
-    private class TimedRunnable implements SafeRunnable {
-        final SafeRunnable runnable;
+    /**
+     * Decorator class for a runnable that measure the execution time.
+     */
+    protected class TimedRunnable implements Runnable {
+        final Runnable runnable;
         final long initNanos;
 
-        TimedRunnable(SafeRunnable runnable) {
+        TimedRunnable(Runnable runnable) {
             this.runnable = runnable;
             this.initNanos = MathUtils.nowInNano();
          }
 
         @Override
-        public void safeRun() {
-            taskPendingStats.registerSuccessfulEvent(
-                    MathUtils.elapsedNanos(initNanos),
-                    TimeUnit.NANOSECONDS);
+        public void run() {
+            taskPendingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(initNanos), TimeUnit.NANOSECONDS);
             long startNanos = MathUtils.nowInNano();
-            this.runnable.safeRun();
+            this.runnable.run();
             long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
             taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
             if (elapsedMicroSec >= warnTimeMicroSec) {
-                LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
-                        runnable, runnable.getClass(), elapsedMicroSec);
+                log.warn("Runnable {}:{} took too long {} micros to execute.", runnable, runnable.getClass(),
+                        elapsedMicroSec);
             }
         }
     }
 
+    protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
+        return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
+    }
+
+    protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
+        return new BoundedExecutorService(executor, this.maxTasksInQueue);
+    }
+
     /**
      * Constructs Safe executor.
      *
@@ -190,37 +208,30 @@ public class OrderedScheduler implements ScheduledExecutorService {
      *            - should we stat task execution
      * @param warnTimeMicroSec
      *            - log long task exec warning after this interval
+     * @param maxTasksInQueue
+     *            - maximum items allowed in a thread queue. -1 for no limit
      */
-    protected OrderedScheduler(String baseName,
-                               int numThreads,
-                               ThreadFactory threadFactory,
-                               StatsLogger statsLogger,
-                               boolean traceTaskExecution,
-                               long warnTimeMicroSec,
-                               int maxTasksInQueue) {
+    protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
+                                StatsLogger statsLogger, boolean traceTaskExecution,
+                                long warnTimeMicroSec, int maxTasksInQueue) {
         checkArgument(numThreads > 0);
         checkArgument(!StringUtils.isBlank(baseName));
 
         this.maxTasksInQueue = maxTasksInQueue;
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
-        threads = new ListeningScheduledExecutorService[numThreads];
+        threads = new ExecutorService[numThreads];
         threadIds = new long[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
-                    new ThreadFactoryBuilder()
-                        .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
-                        .setThreadFactory(threadFactory)
-                        .build());
-            threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue);
+            ThreadPoolExecutor thread = createSingleThreadExecutor(
+                    new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
+                    .setThreadFactory(threadFactory).build());
+            threads[i] = getBoundedExecutor(thread);
 
             final int idx = i;
             try {
-                threads[idx].submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        threadIds[idx] = Thread.currentThread().getId();
-                    }
+                threads[idx].submit(() -> {
+                    threadIds[idx] = Thread.currentThread().getId();
                 }).get();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -271,7 +282,58 @@ public class OrderedScheduler implements ScheduledExecutorService {
         this.traceTaskExecution = traceTaskExecution;
     }
 
-    public ListeningScheduledExecutorService chooseThread() {
+    /**
+     * Schedules a one time action to execute with an ordering guarantee on the key.
+     * @param orderingKey
+     * @param r
+     */
+    public void executeOrdered(Object orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(timedRunnable(r));
+    }
+
+    /**
+     * Schedules a one time action to execute with an ordering guarantee on the key.
+     * @param orderingKey
+     * @param r
+     */
+    public void executeOrdered(long orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(timedRunnable(r));
+    }
+
+    /**
+     * Schedules a one time action to execute with an ordering guarantee on the key.
+     * @param orderingKey
+     * @param r
+     */
+    public void executeOrdered(int orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(timedRunnable(r));
+    }
+
+    public <T> ListenableFuture<T> submitOrdered(long orderingKey, Callable<T> task) {
+        SettableFuture<T> future = SettableFuture.create();
+        executeOrdered(orderingKey, () -> {
+            try {
+                T result = task.call();
+                future.set(result);
+            } catch (Throwable t) {
+                future.setException(t);
+            }
+        });
+
+        return future;
+    }
+
+
+    public long getThreadID(long orderingKey) {
+        // skip hashcode generation in this special case
+        if (threadIds.length == 1) {
+            return threadIds[0];
+        }
+
+        return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
+    }
+
+    public ExecutorService chooseThread() {
         // skip random # generation in this special case
         if (threads.length == 1) {
             return threads[0];
@@ -280,7 +342,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
         return threads[rand.nextInt(threads.length)];
     }
 
-    public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
+    public ExecutorService chooseThread(Object orderingKey) {
         // skip hashcode generation in this special case
         if (threads.length == 1) {
             return threads[0];
@@ -295,7 +357,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
      * @param orderingKey long ordering key
      * @return the thread for executing this order key
      */
-    public ListeningScheduledExecutorService chooseThread(long orderingKey) {
+    public ExecutorService chooseThread(long orderingKey) {
         if (threads.length == 1) {
             return threads[0];
         }
@@ -303,7 +365,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
         return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
     }
 
-    private SafeRunnable timedRunnable(SafeRunnable r) {
+    private Runnable timedRunnable(Runnable r) {
         if (traceTaskExecution) {
             return new TimedRunnable(r);
         } else {
@@ -312,174 +374,75 @@ public class OrderedScheduler implements ScheduledExecutorService {
     }
 
     /**
-     * schedules a one time action to execute.
-     */
-    public void submit(SafeRunnable r) {
-        chooseThread().submit(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
-     *
-     * @param orderingKey order key to submit the task
-     * @param r task to run
-     * @return listenable future on the completion of the task
-     */
-    public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
-        return chooseThread(orderingKey).submit(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
-     *
-     * @param orderingKey order key to submit the task
-     * @param r task to run
-     */
-    public void executeOrdered(Object orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param r
+     * {@inheritDoc}
      */
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return chooseThread().submit(task);
     }
 
     /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param callable
+     * {@inheritDoc}
      */
-    public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
-                                                 Callable<T> callable) {
-        return chooseThread(orderingKey).submit(callable);
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return chooseThread().submit(timedRunnable(task), result);
     }
 
     /**
-     * Creates and executes a one-shot action that becomes enabled after the given delay.
-     *
-     * @param command - the SafeRunnable to execute
-     * @param delay - the time from now to delay execution
-     * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method
-     *         will return null upon completion
+     * {@inheritDoc}
      */
-    public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
-        return chooseThread().schedule(command, delay, unit);
+    @Override
+    public Future<?> submit(Runnable task) {
+        return chooseThread().submit(timedRunnable(task));
     }
 
     /**
-     * Creates and executes a one-shot action that becomes enabled after the given delay.
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param delay - the time from now to delay execution
-     * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method
-     *         will return null upon completion
+     * {@inheritDoc}
      */
-    public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
-        return chooseThread(orderingKey).schedule(command, delay, unit);
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+        return chooseThread().invokeAll(tasks);
     }
 
     /**
-     * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period.
-     *
-     * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
-     *
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param period - the period between successive executions
-     * @param unit - the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get()
-     * method will throw an exception upon cancellation
+     * {@inheritDoc}
      */
-    public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
-        return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                         long timeout,
+                                         TimeUnit unit)
+        throws InterruptedException {
+        return chooseThread().invokeAll(tasks, timeout, unit);
     }
 
     /**
-     * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period.
-     *
-     * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param period - the period between successive executions
-     * @param unit - the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
+     * {@inheritDoc}
      */
-    public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
-            long period, TimeUnit unit) {
-        return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException, ExecutionException {
+        return chooseThread().invokeAny(tasks);
     }
 
     /**
-     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
-     * with the given delay between the termination of one execution and the commencement of the next.
-     *
-     * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
-     * .
-     *
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param delay - the delay between the termination of one execution and the commencement of the next
-     * @param unit - the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
+     * {@inheritDoc}
      */
-    public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
-            TimeUnit unit) {
-        return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return chooseThread().invokeAny(tasks, timeout, unit);
     }
 
     /**
-     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
-     * with the given delay between the termination of one execution and the commencement of the next.
-     *
-     * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
-     * .
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param delay - the delay between the termination of one execution and the commencement of the next
-     * @param unit - the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
+     * {@inheritDoc}
      */
-    public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
-            long delay, TimeUnit unit) {
-        return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    @Override
+    public void execute(Runnable command) {
+        chooseThread().execute(command);
     }
 
-    protected long getThreadID(long orderingKey) {
-        // skip hashcode generation in this special case
-        if (threadIds.length == 1) {
-            return threadIds[0];
-        }
-
-        return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
-    }
 
     /**
      * {@inheritDoc}
@@ -497,7 +460,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
     @Override
     public List<Runnable> shutdownNow() {
         List<Runnable> runnables = new ArrayList<Runnable>();
-        for (ScheduledExecutorService executor : threads) {
+        for (ExecutorService executor : threads) {
             runnables.addAll(executor.shutdownNow());
         }
         return runnables;
@@ -508,7 +471,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
      */
     @Override
     public boolean isShutdown() {
-        for (ScheduledExecutorService executor : threads) {
+        for (ExecutorService executor : threads) {
             if (!executor.isShutdown()) {
                 return false;
             }
@@ -533,7 +496,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
      */
     @Override
     public boolean isTerminated() {
-        for (ScheduledExecutorService executor : threads) {
+        for (ExecutorService executor : threads) {
             if (!executor.isTerminated()) {
                 return false;
             }
@@ -558,112 +521,4 @@ public class OrderedScheduler implements ScheduledExecutorService {
         }
     }
 
-    //
-    // Methods for implementing {@link ScheduledExecutorService}
-    //
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        return chooseThread().schedule(command, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-        return chooseThread().schedule(callable, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
-                                                  long initialDelay, long period, TimeUnit unit) {
-        return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
-                                                     long initialDelay, long delay, TimeUnit unit) {
-        return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return chooseThread().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return chooseThread().submit(task, result);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Future<?> submit(Runnable task) {
-        return chooseThread().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-        return chooseThread().invokeAll(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
-                                         long timeout,
-                                         TimeUnit unit)
-        throws InterruptedException {
-        return chooseThread().invokeAll(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-        return chooseThread().invokeAny(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        return chooseThread().invokeAny(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void execute(Runnable command) {
-        chooseThread().execute(command);
-    }
-
 }
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index bf2a6fb..6f05832 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -17,32 +17,20 @@
  */
 package org.apache.bookkeeper.common.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
 
 /**
  * This class provides 2 things over the java {@link ScheduledExecutorService}.
@@ -58,19 +46,7 @@ import org.apache.commons.lang.StringUtils;
  * achieved by hashing the key objects to threads by their {@link #hashCode()}
  * method.
  */
-public class OrderedScheduler implements ScheduledExecutorService {
-    public static final int NO_TASK_LIMIT = -1;
-    protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
-
-    final String name;
-    final ListeningScheduledExecutorService threads[];
-    final long threadIds[];
-    final Random rand = new Random();
-    final OpStatsLogger taskExecutionStats;
-    final OpStatsLogger taskPendingStats;
-    final boolean traceTaskExecution;
-    final long warnTimeMicroSec;
-    final int maxTasksInQueue;
+public class OrderedScheduler extends OrderedExecutor implements ScheduledExecutorService {
 
     /**
      * Create a builder to build ordered scheduler.
@@ -84,61 +60,13 @@ public class OrderedScheduler implements ScheduledExecutorService {
     /**
      * Builder to build ordered scheduler.
      */
-    public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
-
-    /**
-     * Abstract builder class to build {@link OrderedScheduler}.
-     */
-    public abstract static class AbstractBuilder<T extends OrderedScheduler> {
-        protected String name = getClass().getSimpleName();
-        protected int numThreads = Runtime.getRuntime().availableProcessors();
-        protected ThreadFactory threadFactory = null;
-        protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        protected boolean traceTaskExecution = false;
-        protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
-        protected int maxTasksInQueue = NO_TASK_LIMIT;
-
-        public AbstractBuilder<T> name(String name) {
-            this.name = name;
-            return this;
-        }
-
-        public AbstractBuilder<T> numThreads(int num) {
-            this.numThreads = num;
-            return this;
-        }
-
-        public AbstractBuilder<T> maxTasksInQueue(int num) {
-            this.maxTasksInQueue = num;
-            return this;
-        }
-
-        public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
-            this.threadFactory = threadFactory;
-            return this;
-        }
-
-        public AbstractBuilder<T> statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        public AbstractBuilder<T> traceTaskExecution(boolean enabled) {
-            this.traceTaskExecution = enabled;
-            return this;
-        }
-
-        public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
-            this.warnTimeMicroSec = warnTimeMicroSec;
-            return this;
-        }
-
-        @SuppressWarnings("unchecked")
-        public T build() {
+    public static class SchedulerBuilder extends OrderedExecutor.AbstractBuilder<OrderedScheduler> {
+        @Override
+        public OrderedScheduler build() {
             if (null == threadFactory) {
                 threadFactory = new DefaultThreadFactory(name);
             }
-            return (T) new OrderedScheduler(
+            return new OrderedScheduler(
                 name,
                 numThreads,
                 threadFactory,
@@ -147,32 +75,6 @@ public class OrderedScheduler implements ScheduledExecutorService {
                 warnTimeMicroSec,
                 maxTasksInQueue);
         }
-
-    }
-
-    private class TimedRunnable implements SafeRunnable {
-        final SafeRunnable runnable;
-        final long initNanos;
-
-        TimedRunnable(SafeRunnable runnable) {
-            this.runnable = runnable;
-            this.initNanos = MathUtils.nowInNano();
-         }
-
-        @Override
-        public void safeRun() {
-            taskPendingStats.registerSuccessfulEvent(
-                    MathUtils.elapsedNanos(initNanos),
-                    TimeUnit.NANOSECONDS);
-            long startNanos = MathUtils.nowInNano();
-            this.runnable.safeRun();
-            long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
-            taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
-            if (elapsedMicroSec >= warnTimeMicroSec) {
-                LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
-                        runnable, runnable.getClass(), elapsedMicroSec);
-            }
-        }
     }
 
     /**
@@ -191,172 +93,40 @@ public class OrderedScheduler implements ScheduledExecutorService {
      * @param warnTimeMicroSec
      *            - log long task exec warning after this interval
      */
-    protected OrderedScheduler(String baseName,
+    private OrderedScheduler(String baseName,
                                int numThreads,
                                ThreadFactory threadFactory,
                                StatsLogger statsLogger,
                                boolean traceTaskExecution,
                                long warnTimeMicroSec,
                                int maxTasksInQueue) {
-        checkArgument(numThreads > 0);
-        checkArgument(!StringUtils.isBlank(baseName));
-
-        this.maxTasksInQueue = maxTasksInQueue;
-        this.warnTimeMicroSec = warnTimeMicroSec;
-        name = baseName;
-        threads = new ListeningScheduledExecutorService[numThreads];
-        threadIds = new long[numThreads];
-        for (int i = 0; i < numThreads; i++) {
-            final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
-                    new ThreadFactoryBuilder()
-                        .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
-                        .setThreadFactory(threadFactory)
-                        .build());
-            threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue);
-
-            final int idx = i;
-            try {
-                threads[idx].submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        threadIds[idx] = Thread.currentThread().getId();
-                    }
-                }).get();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException("Couldn't start thread " + i, e);
-            } catch (ExecutionException e) {
-                throw new RuntimeException("Couldn't start thread " + i, e);
-            }
-
-            // Register gauges
-            statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getQueue().size();
-                }
-            });
-            statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
+        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+    }
 
-                @Override
-                public Number getSample() {
-                    return thread.getCompletedTaskCount();
-                }
-            });
-            statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
 
-                @Override
-                public Number getSample() {
-                    return thread.getTaskCount();
-                }
-            });
-        }
+    @Override
+    protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
+        return new ScheduledThreadPoolExecutor(1, factory);
+    }
 
-        // Stats
-        this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution");
-        this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued");
-        this.traceTaskExecution = traceTaskExecution;
+    @Override
+    protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
+        return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue);
     }
 
+    @Override
     public ListeningScheduledExecutorService chooseThread() {
-        // skip random # generation in this special case
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[rand.nextInt(threads.length)];
+        return (ListeningScheduledExecutorService) super.chooseThread();
     }
 
+    @Override
     public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
-        // skip hashcode generation in this special case
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
+        return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
     }
 
-    /**
-     * skip hashcode generation in this special case.
-     *
-     * @param orderingKey long ordering key
-     * @return the thread for executing this order key
-     */
+    @Override
     public ListeningScheduledExecutorService chooseThread(long orderingKey) {
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
-    }
-
-    private SafeRunnable timedRunnable(SafeRunnable r) {
-        if (traceTaskExecution) {
-            return new TimedRunnable(r);
-        } else {
-            return r;
-        }
-    }
-
-    /**
-     * schedules a one time action to execute.
-     */
-    public void submit(SafeRunnable r) {
-        chooseThread().submit(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
-     *
-     * @param orderingKey order key to submit the task
-     * @param r task to run
-     * @return listenable future on the completion of the task
-     */
-    public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
-        return chooseThread(orderingKey).submit(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
-     *
-     * @param orderingKey order key to submit the task
-     * @param r task to run
-     */
-    public void executeOrdered(Object orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
+        return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
     }
 
     /**
@@ -472,91 +242,6 @@ public class OrderedScheduler implements ScheduledExecutorService {
         return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
 
-    protected long getThreadID(long orderingKey) {
-        // skip hashcode generation in this special case
-        if (threadIds.length == 1) {
-            return threadIds[0];
-        }
-
-        return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void shutdown() {
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].shutdown();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Runnable> shutdownNow() {
-        List<Runnable> runnables = new ArrayList<Runnable>();
-        for (ScheduledExecutorService executor : threads) {
-            runnables.addAll(executor.shutdownNow());
-        }
-        return runnables;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isShutdown() {
-        for (ScheduledExecutorService executor : threads) {
-            if (!executor.isShutdown()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-        boolean ret = true;
-        for (int i = 0; i < threads.length; i++) {
-            ret = ret && threads[i].awaitTermination(timeout, unit);
-        }
-        return ret;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isTerminated() {
-        for (ScheduledExecutorService executor : threads) {
-            if (!executor.isTerminated()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Force threads shutdown (cancel active requests) after specified delay,
-     * to be used after shutdown() rejects new requests.
-     */
-    public void forceShutdown(long timeout, TimeUnit unit) {
-        for (int i = 0; i < threads.length; i++) {
-            try {
-                if (!threads[i].awaitTermination(timeout, unit)) {
-                    threads[i].shutdownNow();
-                }
-            } catch (InterruptedException exception) {
-                threads[i].shutdownNow();
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
 
     //
     // Methods for implementing {@link ScheduledExecutorService}
@@ -596,74 +281,4 @@ public class OrderedScheduler implements ScheduledExecutorService {
         return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return chooseThread().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return chooseThread().submit(task, result);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Future<?> submit(Runnable task) {
-        return chooseThread().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-        return chooseThread().invokeAll(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
-                                         long timeout,
-                                         TimeUnit unit)
-        throws InterruptedException {
-        return chooseThread().invokeAll(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-        return chooseThread().invokeAny(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        return chooseThread().invokeAny(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void execute(Runnable command) {
-        chooseThread().execute(command);
-    }
-
 }
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
index 89214e3..6a52ef5 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
@@ -94,7 +94,7 @@ public final class Retries {
                 scheduler,
                 null);
         } else {
-            scheduler.submitOrdered(key, () -> {
+            scheduler.executeOrdered(key, () -> {
                 execute(
                     future,
                     backoffs.iterator(),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 546a8ba..d5f6c8d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.UpdateLedgerOp;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
@@ -112,7 +113,6 @@ import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.LedgerIdFormatter;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.Tool;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -814,7 +814,7 @@ public class BookieShell implements Tool {
                 } else {
                     // Use BookieClient to target a specific bookie
                     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-                    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+                    OrderedExecutor executor = OrderedExecutor.newBuilder()
                         .numThreads(1)
                         .name("BookieClientScheduler")
                         .build();
@@ -1022,6 +1022,7 @@ public class BookieShell implements Tool {
             return ledgerId;
         }
 
+        @Override
         public void operationComplete(int rc, LedgerMetadata result) {
             if (rc != 0) {
                 setException(BKException.create(rc));
@@ -2489,6 +2490,7 @@ public class BookieShell implements Tool {
             return "Convert bookie indexes from InterleavedStorage to DbLedgerStorage format";
         }
 
+        @Override
         String getUsage() {
             return CMD_CONVERT_TO_DB_STORAGE;
         }
@@ -2584,6 +2586,7 @@ public class BookieShell implements Tool {
             return "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format";
         }
 
+        @Override
         String getUsage() {
             return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
         }
@@ -2699,6 +2702,7 @@ public class BookieShell implements Tool {
             return "Rebuild DbLedgerStorage locations index by scanning the entry logs";
         }
 
+        @Override
         String getUsage() {
             return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
         }
@@ -3116,6 +3120,7 @@ public class BookieShell implements Tool {
         };
 
         return new Iterable<SortedMap<Long, Long>>() {
+            @Override
             public Iterator<SortedMap<Long, Long>> iterator() {
                 return iterator;
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index f0c5249..9862665 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -57,6 +57,8 @@ import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
@@ -79,7 +81,6 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.configuration.ConfigurationException;
@@ -134,8 +135,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
     final BookieClient bookieClient;
     final BookieWatcher bookieWatcher;
 
-    final OrderedSafeExecutor mainWorkerPool;
-    final ScheduledExecutorService scheduler;
+    final OrderedExecutor mainWorkerPool;
+    final OrderedScheduler scheduler;
     final HashedWheelTimer requestTimer;
     final boolean ownTimer;
     final FeatureProvider featureProvider;
@@ -421,9 +422,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
 
         // initialize resources
-        this.scheduler = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("BookKeeperClientScheduler"));
-        this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
+        this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
+        this.mainWorkerPool = OrderedExecutor.newBuilder()
                 .name("BookKeeperClientWorker")
                 .numThreads(conf.getNumWorkerThreads())
                 .statsLogger(statsLogger)
@@ -672,12 +672,12 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
     }
 
     @VisibleForTesting
-    OrderedSafeExecutor getMainWorkerPool() {
+    OrderedExecutor getMainWorkerPool() {
         return mainWorkerPool;
     }
 
     @VisibleForTesting
-    ScheduledExecutorService getScheduler() {
+    OrderedScheduler getScheduler() {
         return scheduler;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index c77843a..96c8998 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -116,7 +116,7 @@ interface ExplicitLacFlushPolicy {
                 }
             };
             try {
-                scheduledFuture = lh.bk.getMainWorkerPool().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+                scheduledFuture = lh.bk.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
                         explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS);
             } catch (RejectedExecutionException re) {
                 LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
index 117e359..45ec425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
@@ -30,7 +30,7 @@ import org.apache.bookkeeper.client.SyncCallbackUtils.SyncDeleteCallback;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.bookkeeper.versioning.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * Encapsulates asynchronous ledger delete operation.
  *
  */
-class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
+class LedgerDeleteOp extends OrderedGenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 5c2ff8c..3f599e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -46,7 +46,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -416,7 +416,7 @@ public class LedgerFragmentReplicator {
                 // try again, the previous success (with which this has
                 // conflicted) will have updated the stat other operations
                 // such as (addEnsemble) would update it too.
-                lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+                lh.rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(
                                 lh.bk.mainWorkerPool, lh.getId()) {
                             @Override
                             public void safeOperationComplete(int rc,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 8db5667..a9a85bc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -83,7 +83,7 @@ import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.collections4.IteratorUtils;
 import org.slf4j.Logger;
@@ -176,6 +176,7 @@ public class LedgerHandle implements WriteHandle {
         this.bookieFailureHistory = CacheBuilder.newBuilder()
             .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
             .build(new CacheLoader<BookieSocketAddress, Long>() {
+            @Override
             public Long load(BookieSocketAddress key) {
                 return -1L;
             }
@@ -200,10 +201,12 @@ public class LedgerHandle implements WriteHandle {
         lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
         bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
                                           new Gauge<Integer>() {
-                                              public Integer getDefaultValue() {
+                                              @Override
+                                            public Integer getDefaultValue() {
                                                   return 0;
                                               }
-                                              public Integer getSample() {
+                                              @Override
+                                            public Integer getSample() {
                                                   return pendingAddOps.size();
                                               }
                                           });
@@ -240,6 +243,7 @@ public class LedgerHandle implements WriteHandle {
      *
      * @return the id of the ledger
      */
+    @Override
     public long getId() {
         return ledgerId;
     }
@@ -344,6 +348,7 @@ public class LedgerHandle implements WriteHandle {
      *
      * @return the length of the ledger in bytes
      */
+    @Override
     public synchronized long getLength() {
         return this.length;
     }
@@ -453,7 +458,7 @@ public class LedgerHandle implements WriteHandle {
      * @param rc
      */
     void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
-        bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
+        bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
                 final long prevLastEntryId;
@@ -512,7 +517,7 @@ public class LedgerHandle implements WriteHandle {
                               + metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength());
                 }
 
-                final class CloseCb extends OrderedSafeGenericCallback<Void> {
+                final class CloseCb extends OrderedGenericCallback<Void> {
                     CloseCb() {
                         super(bk.getMainWorkerPool(), ledgerId);
                     }
@@ -520,7 +525,7 @@ public class LedgerHandle implements WriteHandle {
                     @Override
                     public void safeOperationComplete(final int rc, Void result) {
                         if (rc == BKException.Code.MetadataVersionException) {
-                            rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
+                            rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
                                                                                           ledgerId) {
                                 @Override
                                 public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
@@ -830,7 +835,7 @@ public class LedgerHandle implements WriteHandle {
                                                               boolean isRecoveryRead) {
         PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
         if (!bk.isClosed()) {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } else {
             op.future().completeExceptionally(BKException.create(ClientClosedException));
         }
@@ -1099,7 +1104,7 @@ public class LedgerHandle implements WriteHandle {
         }
 
         try {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
             op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
                     LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
@@ -1689,7 +1694,7 @@ public class LedgerHandle implements WriteHandle {
      * reformed ensemble. On MetadataVersionException, will reread latest
      * ledgerMetadata and act upon.
      */
-    private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> {
+    private final class ChangeEnsembleCb extends OrderedGenericCallback<Void> {
         private final EnsembleInfo ensembleInfo;
         private final int curBlockAddCompletions;
         private final int ensembleChangeIdx;
@@ -1749,7 +1754,7 @@ public class LedgerHandle implements WriteHandle {
      * Callback which is reading the ledgerMetadata present in zk. This will try
      * to resolve the version conflicts.
      */
-    private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> {
+    private final class ReReadLedgerMetadataCb extends OrderedGenericCallback<LedgerMetadata> {
         private final int rc;
         private final EnsembleInfo ensembleInfo;
         private final int curBlockAddCompletions;
@@ -2002,11 +2007,11 @@ public class LedgerHandle implements WriteHandle {
             return;
         }
 
-        writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
+        writeLedgerConfig(new OrderedGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
             @Override
             public void safeOperationComplete(final int rc, Void result) {
                 if (rc == BKException.Code.MetadataVersionException) {
-                    rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
+                    rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
                                                                                   ledgerId) {
                         @Override
                         public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index afd56cf..0eaf0b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -238,7 +238,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
         }
 
         try {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
             op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
                               LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 16b577b..33cfaf2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,7 +183,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
         }
 
         if (doRecovery) {
-            lh.recover(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
+            lh.recover(new OrderedGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
                 @Override
                 public void safeOperationComplete(int rc, Void result) {
                     if (rc == BKException.Code.OK) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index dd5058e..eb95766 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -143,7 +143,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
 
     void timeoutQuorumWait() {
         try {
-            lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, new SafeRunnable() {
+            lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     if (completed) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 484e9c7..a41207d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -505,7 +505,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
     }
 
     public void submit() {
-        lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this);
+        lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
     }
 
     void initiate() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 9d62f72..eef0e70 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -163,7 +163,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
         }
         if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
             try {
-                bk.getMainWorkerPool().submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
+                bk.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata));
             } catch (RejectedExecutionException ree) {
                 LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
                         ledgerId, newMetadata);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 285a06a..c160b20 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -63,7 +64,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +77,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
     // This is global state that should be across all BookieClients
     AtomicLong totalBytesOutstanding = new AtomicLong();
 
-    OrderedSafeExecutor executor;
+    OrderedExecutor executor;
     ScheduledExecutorService scheduler;
     ScheduledFuture<?> timeoutFuture;
 
@@ -97,7 +97,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
     private final long bookieErrorThresholdPerInterval;
 
     public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
-                        OrderedSafeExecutor executor, ScheduledExecutorService scheduler,
+                        OrderedExecutor executor, ScheduledExecutorService scheduler,
                         StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
@@ -198,7 +198,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         client.obtain((rc, pcbc) -> {
             if (rc != BKException.Code.OK) {
                 try {
-                    executor.submitOrdered(ledgerId, safeRun(() -> {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
                         cb.writeLacComplete(rc, ledgerId, addr, ctx);
                     }));
                 } catch (RejectedExecutionException re) {
@@ -219,7 +219,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                              final WriteCallback cb,
                              final Object ctx) {
         try {
-            executor.submitOrdered(ledgerId, new SafeRunnable() {
+            executor.executeOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
@@ -266,7 +266,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                               final ReadEntryCallback cb,
                               final Object ctx) {
         try {
-            executor.submitOrdered(ledgerId, new SafeRunnable() {
+            executor.executeOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
@@ -362,7 +362,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         client.obtain((rc, pcbc) -> {
             if (rc != BKException.Code.OK) {
                 try {
-                    executor.submitOrdered(ledgerId, safeRun(() -> {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
                         cb.readLacComplete(rc, ledgerId, null, null, ctx);
                     }));
                 } catch (RejectedExecutionException re) {
@@ -526,7 +526,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         byte hello[] = "hello".getBytes(UTF_8);
         long ledger = Long.parseLong(args[2]);
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
-        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+        OrderedExecutor executor = OrderedExecutor.newBuilder()
                 .name("BookieClientWorker")
                 .numThreads(1)
                 .build();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 94a43a9..7e65ced 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -59,7 +59,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.Counter;
@@ -68,7 +68,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,12 +92,12 @@ public class BookieRequestProcessor implements RequestProcessor {
     /**
      * The threadpool used to execute all read entry requests issued to this server.
      */
-    private final OrderedSafeExecutor readThreadPool;
+    private final OrderedExecutor readThreadPool;
 
     /**
      * The threadpool used to execute all add entry requests issued to this server.
      */
-    private final OrderedSafeExecutor writeThreadPool;
+    private final OrderedExecutor writeThreadPool;
 
     /**
      * TLS management.
@@ -109,12 +108,12 @@ public class BookieRequestProcessor implements RequestProcessor {
      * The threadpool used to execute all long poll requests issued to this server
      * after they are done waiting.
      */
-    private final OrderedSafeExecutor longPollThreadPool;
+    private final OrderedExecutor longPollThreadPool;
 
     /**
      * The threadpool used to execute high priority requests.
      */
-    private final OrderedSafeExecutor highPriorityThreadPool;
+    private final OrderedExecutor highPriorityThreadPool;
 
     /**
      * The Timer used to time out requests for long polling.
@@ -162,11 +161,11 @@ public class BookieRequestProcessor implements RequestProcessor {
         this.longPollThreadPool = createExecutor(
                 this.serverCfg.getNumLongPollWorkerThreads(),
                 "BookieLongPollThread-" + serverCfg.getBookiePort(),
-                OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+                OrderedExecutor.NO_TASK_LIMIT, statsLogger);
         this.highPriorityThreadPool = createExecutor(
                 this.serverCfg.getNumHighPriorityWorkerThreads(),
                 "BookieHighPriorityThread-" + serverCfg.getBookiePort(),
-                OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+                OrderedExecutor.NO_TASK_LIMIT, statsLogger);
         this.requestTimer = new HashedWheelTimer(
             new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
             this.serverCfg.getRequestTimerTickDurationMs(),
@@ -208,7 +207,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         shutdownExecutor(highPriorityThreadPool);
     }
 
-    private OrderedSafeExecutor createExecutor(
+    private OrderedExecutor createExecutor(
             int numThreads,
             String nameFormat,
             int maxTasksInQueue,
@@ -216,7 +215,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (numThreads <= 0) {
             return null;
         } else {
-            return OrderedSafeExecutor.newBuilder()
+            return OrderedExecutor.newBuilder()
                     .numThreads(numThreads)
                     .name(nameFormat)
                     .traceTaskExecution(serverCfg.getEnableTaskExecutionStats())
@@ -226,7 +225,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void shutdownExecutor(OrderedSafeExecutor service) {
+    private void shutdownExecutor(OrderedExecutor service) {
         if (null != service) {
             service.shutdown();
         }
@@ -310,7 +309,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (null == writeThreadPool) {
             writeLac.run();
         } else {
-            writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), writeLac);
+            writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), writeLac);
         }
     }
 
@@ -319,14 +318,14 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (null == readThreadPool) {
             readLac.run();
         } else {
-            readThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), readLac);
+            readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), readLac);
         }
     }
 
     private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
         WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
 
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (RequestUtils.isHighPriority(r)) {
             threadPool = highPriorityThreadPool;
         } else {
@@ -337,7 +336,7 @@ public class BookieRequestProcessor implements RequestProcessor {
             write.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getAddRequest().getLedgerId(), write);
+                threadPool.executeOrdered(r.getAddRequest().getLedgerId(), write);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests",
@@ -361,7 +360,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
 
         final ReadEntryProcessorV3 read;
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
             ExecutorService lpThread = null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c);
 
@@ -387,7 +386,7 @@ public class BookieRequestProcessor implements RequestProcessor {
             read.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+                threadPool.executeOrdered(r.getReadRequest().getLedgerId(), read);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests",
@@ -464,7 +463,7 @@ public class BookieRequestProcessor implements RequestProcessor {
 
         // If it's a high priority add (usually as part of recovery process), we want to make sure it gets
         // executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (r.isHighPriority()) {
             threadPool = highPriorityThreadPool;
         } else {
@@ -475,7 +474,7 @@ public class BookieRequestProcessor implements RequestProcessor {
             write.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getLedgerId(), write);
+                threadPool.executeOrdered(r.getLedgerId(), write);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", r.ledgerId,
@@ -494,7 +493,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         // If it's a high priority read (fencing or as part of recovery process), we want to make sure it
         // gets executed as fast as possible, so bypass the normal readThreadPool
         // and execute in highPriorityThreadPool
-        final OrderedSafeExecutor threadPool;
+        final OrderedExecutor threadPool;
         if (r.isHighPriority() || r.isFencing()) {
             threadPool = highPriorityThreadPool;
         } else {
@@ -505,7 +504,7 @@ public class BookieRequestProcessor implements RequestProcessor {
             read.run();
         } else {
             try {
-                threadPool.submitOrdered(r.getLedgerId(), read);
+                threadPool.executeOrdered(r.getLedgerId(), read);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", r.ledgerId,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 439320b..20fcdda 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -120,7 +121,6 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
@@ -149,7 +149,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     final BookieSocketAddress addr;
     final EventLoopGroup eventLoopGroup;
-    final OrderedSafeExecutor executor;
+    final OrderedExecutor executor;
     final long addEntryTimeoutNanos;
     final long readEntryTimeoutNanos;
     final int maxFrameSize;
@@ -213,13 +213,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     private final ExtensionRegistry extRegistry;
     private final SecurityHandlerFactory shFactory;
 
-    public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
+    public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
                                   BookieSocketAddress addr) throws SecurityException {
         this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null,
                 null);
     }
 
-    public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
+    public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
                                   BookieSocketAddress addr,
                                   ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry) throws SecurityException {
@@ -227,7 +227,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 authProviderFactory, extRegistry, null);
     }
 
-    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
+    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
                                   EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
@@ -236,7 +236,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 authProviderFactory, extRegistry, pcbcPool, null);
     }
 
-    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
+    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
                                   EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
@@ -1111,7 +1111,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             response.release();
         } else {
             long orderingKey = completionValue.ledgerId;
-            executor.submitOrdered(orderingKey,
+            executor.executeOrdered(orderingKey,
                     ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId,
                                                   status, response));
         }
@@ -1226,7 +1226,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             }
         } else {
             long orderingKey = completionValue.ledgerId;
-            executor.submitOrdered(orderingKey, new SafeRunnable() {
+            executor.executeOrdered(orderingKey, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     completionValue.handleV3Response(response);
@@ -1400,7 +1400,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
 
         protected void errorOutAndRunCallback(final Runnable callback) {
-            executor.submitOrdered(ledgerId,
+            executor.executeOrdered(ledgerId,
                     new SafeRunnable() {
                         @Override
                         public void safeRun() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
new file mode 100644
index 0000000..73150ad
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic callback implementation which will run the
+ * callback in the thread which matches the ordering key.
+ */
+public abstract class OrderedGenericCallback<T> implements GenericCallback<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(OrderedGenericCallback.class);
+
+    private final OrderedExecutor executor;
+    private final long orderingKey;
+
+    /**
+     * @param executor The executor on which to run the callback
+     * @param orderingKey Key used to decide which thread the callback
+     *                    should run on.
+     */
+    public OrderedGenericCallback(OrderedExecutor executor, long orderingKey) {
+        this.executor = executor;
+        this.orderingKey = orderingKey;
+    }
+
+    @Override
+    public final void operationComplete(final int rc, final T result) {
+        // during closing, callbacks that are error out might try to submit to
+        // the scheduler again. if the submission will go to same thread, we
+        // don't need to submit to executor again. this is also an optimization for
+        // callback submission
+        if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
+            safeOperationComplete(rc, result);
+        } else {
+            try {
+                executor.executeOrdered(orderingKey, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        safeOperationComplete(rc, result);
+                    }
+                    @Override
+                    public String toString() {
+                        return String.format("Callback(key=%s, name=%s)",
+                                             orderingKey,
+                                             OrderedGenericCallback.this);
+                    }
+                });
+            } catch (RejectedExecutionException re) {
+                LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
+            }
+        }
+    }
+
+    public abstract void safeOperationComplete(int rc, T result);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
deleted file mode 100644
index f3af7df..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.util;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides 2 things over the java {@link ScheduledExecutorService}.
- *
- * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
- * This means that exceptions in scheduled tasks wont go unnoticed and will be
- * logged.
- *
- * <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
- * with the same key will always be executed in order, but tasks across
- * different keys can be unordered. This retains parallelism while retaining the
- * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
- * achieved by hashing the key objects to threads by their {@link #hashCode()}
- * method.
- *
- * <p>Note: deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.OrderedScheduler}.
- */
-public class OrderedSafeExecutor extends org.apache.bookkeeper.common.util.OrderedScheduler {
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * A builder class for an OrderedSafeExecutor.
-     */
-    public static class Builder extends AbstractBuilder<OrderedSafeExecutor> {
-
-        public OrderedSafeExecutor build() {
-            if (null == threadFactory) {
-                threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor");
-            }
-            return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger,
-                                           traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
-        }
-    }
-
-    /**
-     * Constructs Safe executor.
-     *
-     * @param numThreads
-     *            - number of threads
-     * @param baseName
-     *            - base name of executor threads
-     * @param threadFactory
-     *            - for constructing threads
-     * @param statsLogger
-     *            - for reporting executor stats
-     * @param traceTaskExecution
-     *            - should we stat task execution
-     * @param warnTimeMicroSec
-     *            - log long task exec warning after this interval
-     * @param maxTasksInQueue
-     *            - maximum items allowed in a thread queue. -1 for no limit
-     */
-    private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
-                                StatsLogger statsLogger, boolean traceTaskExecution,
-                                long warnTimeMicroSec, int maxTasksInQueue) {
-        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
-    }
-
-    /**
-     * Schedules a one time action to execute.
-     */
-    public void submit(SafeRunnable r) {
-        super.submit(r);
-    }
-
-    /**
-     * Schedules a one time action to execute with an ordering guarantee on the key.
-     * @param orderingKey
-     * @param r
-     */
-    public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
-        return super.submitOrdered(orderingKey, r);
-    }
-
-    /**
-     * Schedules a one time action to execute with an ordering guarantee on the key.
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
-        super.submitOrdered(orderingKey, r);
-    }
-
-    /**
-     * Schedules a one time action to execute with an ordering guarantee on the key.
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
-        super.submitOrdered(orderingKey, r);
-    }
-
-    /**
-     * Creates and executes a one-shot action that becomes enabled after the given delay.
-     *
-     * @param command - the SafeRunnable to execute
-     * @param delay - the time from now to delay execution
-     * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method
-     *      will return null upon completion
-     */
-    public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
-        return super.schedule(command, delay, unit);
-    }
-
-    /**
-     * Creates and executes a one-shot action that becomes enabled after the given delay.
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param delay - the time from now to delay execution
-     * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method
-     *      will return null upon completion
-     */
-    public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
-        return super.scheduleOrdered(orderingKey, command, delay, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period.
-     *
-     * <p>For more details check scheduleAtFixedRate in interface ScheduledExecutorService
-     *
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param period - the period between successive executions
-     * @param unit - the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get()
-     *      method will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
-        return super.scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period.
-     *
-     * <p>For more details check scheduleAtFixedRate in interface ScheduledExecutorService
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param period - the period between successive executions
-     * @param unit - the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
-            long period, TimeUnit unit) {
-        return super.scheduleAtFixedRateOrdered(orderingKey, command, initialDelay, period, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
-     * with the given delay between the termination of one execution and the commencement of the next.
-     *
-     * <p>For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
-     *
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param delay - the delay between the termination of one execution and the commencement of the next
-     * @param unit - the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
-            TimeUnit unit) {
-        return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
-    }
-
-    /**
-     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
-     * with the given delay between the termination of one execution and the commencement of the next.
-     *
-     * <p>For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
-     *
-     * @param orderingKey - the key used for ordering
-     * @param command - the SafeRunnable to execute
-     * @param initialDelay - the time to delay first execution
-     * @param delay - the delay between the termination of one execution and the commencement of the next
-     * @param unit - the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of the task, and whose get() method
-     * will throw an exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
-            long delay, TimeUnit unit) {
-        return super.scheduleWithFixedDelayOrdered(orderingKey, command, initialDelay, delay, unit);
-    }
-
-    /**
-     * Generic callback implementation which will run the
-     * callback in the thread which matches the ordering key.
-     */
-    public abstract static class OrderedSafeGenericCallback<T>
-            implements GenericCallback<T> {
-        private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
-
-        private final OrderedSafeExecutor executor;
-        private final long orderingKey;
-
-        /**
-         * @param executor The executor on which to run the callback
-         * @param orderingKey Key used to decide which thread the callback
-         *                    should run on.
-         */
-        public OrderedSafeGenericCallback(OrderedSafeExecutor executor, long orderingKey) {
-            this.executor = executor;
-            this.orderingKey = orderingKey;
-        }
-
-        @Override
-        public final void operationComplete(final int rc, final T result) {
-            // during closing, callbacks that are error out might try to submit to
-            // the scheduler again. if the submission will go to same thread, we
-            // don't need to submit to executor again. this is also an optimization for
-            // callback submission
-            if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
-                safeOperationComplete(rc, result);
-            } else {
-                try {
-                    executor.submitOrdered(orderingKey, new SafeRunnable() {
-                        @Override
-                        public void safeRun() {
-                            safeOperationComplete(rc, result);
-                        }
-                        @Override
-                        public String toString() {
-                            return String.format("Callback(key=%s, name=%s)",
-                                                 orderingKey,
-                                                 OrderedSafeGenericCallback.this);
-                        }
-                    });
-                } catch (RejectedExecutionException re) {
-                    LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
-                }
-            }
-        }
-
-        public abstract void safeOperationComplete(int rc, T result);
-    }
-}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index e686742..5efd7bd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -29,8 +29,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -41,15 +43,16 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
@@ -61,7 +64,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.invocation.InvocationOnMock;
@@ -76,8 +78,8 @@ public abstract class MockBookKeeperTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(MockBookKeeperTestCase.class);
 
-    protected ScheduledExecutorService scheduler;
-    protected OrderedSafeExecutor executor;
+    protected OrderedScheduler scheduler;
+    protected OrderedExecutor executor;
     protected BookKeeper bk;
     protected BookieClient bookieClient;
     protected LedgerManager ledgerManager;
@@ -128,8 +130,8 @@ public abstract class MockBookKeeperTestCase {
         mockLedgerData = new ConcurrentHashMap<>();
         mockNextLedgerId = new AtomicLong(1);
         fencedLedgers = new ConcurrentSkipListSet<>();
-        scheduler = new ScheduledThreadPoolExecutor(4);
-        executor = OrderedSafeExecutor.newBuilder().build();
+        scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(4).name("bk-test").build();
+        executor = OrderedExecutor.newBuilder().build();
         bookieWatcher = mock(BookieWatcher.class);
 
         bookieClient = mock(BookieClient.class);
@@ -312,7 +314,7 @@ public abstract class MockBookKeeperTestCase {
         doAnswer(invocation -> {
             Object[] args = invocation.getArguments();
             Long ledgerId = (Long) args[0];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[1];
                 LedgerMetadata ledgerMetadata = mockLedgerMetadataRegistry.get(ledgerId);
                 if (ledgerMetadata == null) {
@@ -330,7 +332,7 @@ public abstract class MockBookKeeperTestCase {
         doAnswer(invocation -> {
             Object[] args = invocation.getArguments();
             Long ledgerId = (Long) args[0];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
                 if (mockLedgerMetadataRegistry.remove(ledgerId) != null) {
                     cb.operationComplete(BKException.Code.OK, null);
@@ -368,7 +370,7 @@ public abstract class MockBookKeeperTestCase {
             Object[] args = invocation.getArguments();
             BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
             Long ledgerId = (Long) args[0];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1];
                 mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(ledgerMetadata));
                 cb.operationComplete(BKException.Code.OK, null);
@@ -384,7 +386,7 @@ public abstract class MockBookKeeperTestCase {
             Long ledgerId = (Long) args[0];
             LedgerMetadata metadata = (LedgerMetadata) args[1];
             BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(metadata));
                 cb.operationComplete(BKException.Code.OK, null);
             });
@@ -403,7 +405,7 @@ public abstract class MockBookKeeperTestCase {
                 (BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
             boolean fenced = (((Integer) args[5]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING;
 
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 DigestManager macManager = null;
                 try {
                     macManager = getDigestType(ledgerId);
@@ -478,7 +480,7 @@ public abstract class MockBookKeeperTestCase {
             boolean isRecoveryAdd =
                 ((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD;
 
-            executor.submitOrdered(ledgerId, () -> {
+            executor.executeOrdered(ledgerId, () -> {
                 byte[] entry;
                 try {
                     entry = extractEntryPayload(ledgerId, entryId, toSend);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index bf6c230..a91dffa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -41,7 +42,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCall
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +56,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
     private static final Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class);
     DigestType digestType;
     public EventLoopGroup eventLoopGroup;
-    public OrderedSafeExecutor executor;
+    public OrderedExecutor executor;
     private ScheduledExecutorService scheduler;
 
     public TestGetBookieInfoTimeout() {
@@ -69,7 +69,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
         super.setUp();
         eventLoopGroup = new NioEventLoopGroup();
 
-        executor = OrderedSafeExecutor.newBuilder()
+        executor = OrderedExecutor.newBuilder()
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 59811d1..a07acef 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.auth.TestAuth;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -49,7 +50,6 @@ import org.apache.bookkeeper.proto.BookieProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.Test;
 
 /**
@@ -64,7 +64,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
     ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
     ClientAuthProvider.Factory authProvider;
     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
+    OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
             .build();
 
     public TestBackwardCompatCMS42() throws Exception {
@@ -191,7 +191,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
         final CountDownLatch connected = new CountDownLatch(1);
 
         CompatClient42(ClientConfiguration conf,
-                       OrderedSafeExecutor executor,
+                       OrderedExecutor executor,
                        EventLoopGroup eventLoopGroup,
                        BookieSocketAddress addr,
                        ClientAuthProvider.Factory authProviderFactory,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 90788b1..d96dce1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -46,7 +47,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -79,7 +79,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
     @Test
     public void testConnectCloseRace() throws Exception {
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        OrderedExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 1000; i++) {
@@ -98,8 +98,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
         executor.shutdown();
     }
 
-    public OrderedSafeExecutor getOrderedSafeExecutor() {
-        return OrderedSafeExecutor.newBuilder()
+    public OrderedExecutor getOrderedSafeExecutor() {
+        return OrderedExecutor.newBuilder()
             .name("PCBC")
             .numThreads(1)
             .traceTaskExecution(true)
@@ -122,7 +122,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
             }
         };
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        OrderedExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 100; i++) {
@@ -154,7 +154,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
         };
         final int iterations = 100000;
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        OrderedExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
         final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
@@ -250,7 +250,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
         bs.add(startBookie(conf, delayBookie));
 
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        final OrderedSafeExecutor executor = getOrderedSafeExecutor();
+        final OrderedExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
         final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
@@ -268,7 +268,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
             @Override
             public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
                 if (rc != BKException.Code.OK) {
-                    executor.submitOrdered(1, new SafeRunnable() {
+                    executor.executeOrdered(1, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             cb.readEntryComplete(rc, 1, 1, null, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7ecce0b..29c2e3c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -54,7 +55,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -68,7 +68,7 @@ public class BookieClientTest {
     public int port = 13645;
 
     public EventLoopGroup eventLoopGroup;
-    public OrderedSafeExecutor executor;
+    public OrderedExecutor executor;
     private ScheduledExecutorService scheduler;
 
     @Before
@@ -85,7 +85,7 @@ public class BookieClientTest {
         bs = new BookieServer(conf);
         bs.start();
         eventLoopGroup = new NioEventLoopGroup();
-        executor = OrderedSafeExecutor.newBuilder()
+        executor = OrderedExecutor.newBuilder()
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 1886c2a..c5f7f07 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -123,6 +123,9 @@
   <Match>
     <Class name="~org\.apache\.bookkeeper\.stats\.generated.*" />
   </Match>
+  <Match>
+    <Class name="~org\.apache\.bookkeeper\.common\.generated.*" />
+  </Match>
 
   <!-- modules under stream/ -->
 
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java
new file mode 100644
index 0000000..66ce59b
--- /dev/null
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Microbenchmarks for different executors providers.
+ */
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Threads(16)
+@Fork(1)
+@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+public class OrderedExecutorBenchmark {
+
+    private static Map<String, Supplier<ExecutorService>> providers = ImmutableMap.of( //
+            "JDK-ThreadPool", () -> Executors.newFixedThreadPool(1),
+            "OrderedExecutor", () -> OrderedExecutor.newBuilder().numThreads(1).build(), //
+            "OrderedScheduler", () -> OrderedScheduler.newSchedulerBuilder().numThreads(1).build());
+
+    @State(Scope.Benchmark)
+    public static class TestState {
+        @Param({ "JDK-ThreadPool", "OrderedExecutor", "OrderedScheduler" })
+        private String executorName;
+
+        private ExecutorService executor;
+
+        @Setup(Level.Trial)
+        public void setup() {
+            executor = providers.get(executorName).get();
+        }
+
+        @TearDown(Level.Trial)
+        public void teardown() {
+            executor.shutdown();
+        }
+    }
+
+    @Benchmark
+    public void submitAndWait(TestState s) throws Exception {
+        s.executor.submit(() -> {
+        }).get();
+    }
+}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 6adcbc4..3c8dfb7 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -391,6 +391,7 @@ class BKAsyncLogReader implements AsyncLogReader, SafeRunnable, AsyncNotificatio
         return readInternal(1, 0, TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION);
     }
 
+    @Override
     public synchronized CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries) {
         return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
     }
@@ -477,7 +478,7 @@ class BKAsyncLogReader implements AsyncLogReader, SafeRunnable, AsyncNotificatio
         long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
             scheduleDelayStopwatch.reset().start();
-            scheduler.submitOrdered(streamName, this);
+            scheduler.executeOrdered(streamName, this);
         }
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index a0dd6ad..c9bca44 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -394,7 +394,7 @@ class ReadAheadEntryReader implements
             }
         }
         try {
-            scheduler.submitOrdered(streamName, runnable);
+            scheduler.executeOrdered(streamName, runnable);
         } catch (RejectedExecutionException ree) {
             logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
                     streamName, ree);
@@ -449,7 +449,7 @@ class ReadAheadEntryReader implements
         // use runnable here instead of CloseableRunnable,
         // because we need this to be executed
         try {
-            scheduler.submitOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
+            scheduler.executeOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
         } catch (RejectedExecutionException ree) {
             logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
                     streamName, ree);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index 5a9ee1a..d78b455 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -212,7 +212,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            scheduler.submitOrdered(key, r);
+            scheduler.executeOrdered(key, r);
         } finally {
             closeLock.readLock().unlock();
         }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 898c113..813e974 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -691,7 +691,7 @@ public class BKLogSegmentEntryReader implements SafeRunnable, LogSegmentEntryRea
 
         long prevCount = scheduleCountUpdater.getAndIncrement(this);
         if (0 == prevCount) {
-            scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
+            scheduler.executeOrdered(getSegment().getLogSegmentId(), this);
         }
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 7948084..107c5e3 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -135,6 +135,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
      * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
      * list--is executed synchronously, but the lock wait itself doesn't block.
      */
+    @Override
     public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
         if (null != lockAcquireFuture) {
             return FutureUtils.exception(
@@ -145,7 +146,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
             if (null == throwable || !(throwable instanceof CancellationException)) {
                 return;
             }
-            lockStateExecutor.submitOrdered(lockPath, () -> asyncClose());
+            lockStateExecutor.executeOrdered(lockPath, () -> asyncClose());
         });
         final Stopwatch stopwatch = Stopwatch.createStarted();
         promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
@@ -165,7 +166,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
             }
         });
         this.lockAcquireFuture = promise;
-        lockStateExecutor.submitOrdered(
+        lockStateExecutor.executeOrdered(
             lockPath, () -> doAsyncAcquireWithSemaphore(promise, lockTimeout));
         return promise;
     }
@@ -293,6 +294,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
      *
      * @throws LockingException     if the lock attempt fails
      */
+    @Override
     public synchronized void checkOwnershipAndReacquire() throws LockingException {
         if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
             throw new LockingException(lockPath, "check ownership before acquiring");
@@ -315,6 +317,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
      *
      * @throws LockingException     if the lock attempt fails
      */
+    @Override
     public synchronized void checkOwnership() throws LockingException {
         if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
             throw new LockingException(lockPath, "check ownership before acquiring");
@@ -426,7 +429,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
                 FutureUtils.complete(closePromise, null);
             }
         }, lockStateExecutor.chooseThread(lockPath));
-        lockStateExecutor.submitOrdered(
+        lockStateExecutor.executeOrdered(
             lockPath, () -> closeWaiter(lockWaiter, closeWaiterFuture));
         return closePromise;
     }
@@ -434,7 +437,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
     void internalReacquireLock(final AtomicInteger numRetries,
                                final long lockTimeout,
                                final CompletableFuture<ZKDistributedLock> reacquirePromise) {
-        lockStateExecutor.submitOrdered(
+        lockStateExecutor.executeOrdered(
             lockPath, () -> doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise));
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index 3d79336..84c516c 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -168,6 +168,7 @@ class ZKSessionLock implements SessionLock {
     }
 
     static final Comparator<String> MEMBER_COMPARATOR = new Comparator<String>() {
+        @Override
         public int compare(String o1, String o2) {
             int l1 = parseMemberID(o1);
             int l2 = parseMemberID(o2);
@@ -374,6 +375,7 @@ class ZKSessionLock implements SessionLock {
         return lockId;
     }
 
+    @Override
     public boolean isLockExpired() {
         return lockState.isExpiredOrClosing();
     }
@@ -392,7 +394,7 @@ class ZKSessionLock implements SessionLock {
      *          function to execute a lock action
      */
     protected void executeLockAction(final int lockEpoch, final LockAction func) {
-        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+        lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 if (getEpoch() == lockEpoch) {
@@ -431,7 +433,7 @@ class ZKSessionLock implements SessionLock {
      */
     protected <T> void executeLockAction(final int lockEpoch,
                                          final LockAction func, final CompletableFuture<T> promise) {
-        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+        lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 int currentEpoch = getEpoch();
@@ -554,7 +556,7 @@ class ZKSessionLock implements SessionLock {
                 @Override
                 public void processResult(final int rc, String path, Object ctx,
                                           final List<String> children, Stat stat) {
-                    lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+                    lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             if (!lockState.inState(State.INIT)) {
@@ -648,7 +650,7 @@ class ZKSessionLock implements SessionLock {
     private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner,
                                           final CompletableFuture<String> result) {
         if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) {
-            lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+            lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     result.complete(currentOwner.getLeft());
@@ -878,7 +880,7 @@ class ZKSessionLock implements SessionLock {
         // Use lock executor here rather than lock action, because we want this opertaion to be applied
         // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no
         // risk of an ABA problem where we delete and recreate a node and then delete it again here.
-        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+        lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 acquireFuture.completeExceptionally(cause);
@@ -980,7 +982,7 @@ class ZKSessionLock implements SessionLock {
         zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(final int rc, final String path, Object ctx) {
-                lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+                lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         if (KeeperException.Code.OK.intValue() == rc) {
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index 517b0df..437bb6f 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -65,6 +65,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
     private BookKeeperClient bkc;
     private ZooKeeperClient zkc;
 
+    @Override
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -94,6 +95,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
                 .build();
     }
 
+    @Override
     @After
     public void teardown() throws Exception {
         if (null != bkc) {
@@ -138,15 +140,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
 
     private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
         final CompletableFuture<Void> promise = new CompletableFuture<Void>();
-        scheduler.submitOrdered(streamName, () -> {
+        scheduler.executeOrdered(streamName, () -> {
             FutureUtils.complete(promise, null);
-            // the following line is needed for oraclejdk9 to avoid following exception
-            // ```
-            // incompatible types: inference variable T has incompatible bounds
-            // upper bounds: java.lang.Object
-            // lower bounds: void
-            // ```
-            return (Void) null;
         });
         Utils.ioResult(promise);
     }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index 901d2a0..8a9d9c2 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -779,7 +779,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         // expire session
         ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
         // submit a runnable to lock state executor to ensure any state changes happened when session expired
-        lockStateExecutor.submitOrdered(lockPath, () -> expiredLatch.countDown());
+        lockStateExecutor.executeOrdered(lockPath, () -> expiredLatch.countDown());
         expiredLatch.await();
         // no watcher was registered if never acquired lock successfully
         assertEquals(State.INIT, lock.getLockState());
@@ -1216,7 +1216,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         } else {
             ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
             final CountDownLatch latch = new CountDownLatch(1);
-            lockStateExecutor.submitOrdered(lockPath, () -> latch.countDown());
+            lockStateExecutor.executeOrdered(lockPath, () -> latch.countDown());
             latch.await();
             children = getLockWaiters(zkc, lockPath);
             assertEquals(0, children.size());
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
index 6155cba..39e00c0 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
@@ -79,7 +79,7 @@ public final class FailRequestStorageContainer implements StorageContainer {
 
     private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
         CompletableFuture<T> future = FutureUtils.createFuture();
-        scheduler.submitOrdered(scId, () -> {
+        scheduler.executeOrdered(scId, () -> {
             future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
         });
         return future;
diff --git a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
index e462d33..eba29f1 100644
--- a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
+++ b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
@@ -50,7 +50,7 @@ public class BookKeeperServerShadedJarTest {
 
     @Test
     public void testBookKeeperCommon() throws Exception {
-        Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor");
+        Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor");
         assertTrue(true);
     }
 
diff --git a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
index 45be333..c4519df 100644
--- a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
+++ b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
@@ -85,13 +85,13 @@ public class DistributedLogCoreShadedJarTest {
 
     @Test(expected = ClassNotFoundException.class)
     public void testBookKeeperCommon() throws Exception {
-        Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor");
+        Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor");
         assertTrue(true);
     }
 
     @Test
     public void testBookKeeperCommonShade() throws Exception {
-        Class.forName("dlshade.org.apache.bookkeeper.util.OrderedSafeExecutor");
+        Class.forName("dlshade.org.apache.bookkeeper.common.util.OrderedExecutor");
         assertTrue(true);
     }
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.