You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/28 02:10:04 UTC

[GitHub] sijie closed pull request #1211: Use the bk OrderedScheduler in dlog

sijie closed pull request #1211: Use the bk OrderedScheduler in dlog
URL: https://github.com/apache/bookkeeper/pull/1211
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 76dd375e5..463e29281 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -22,15 +22,20 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -51,7 +56,7 @@
  * achieved by hashing the key objects to threads by their {@link #hashCode()}
  * method.
  */
-public class OrderedScheduler {
+public class OrderedScheduler implements ScheduledExecutorService {
     public static final int NO_TASK_LIMIT = -1;
     protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
 
@@ -312,10 +317,11 @@ public void submit(SafeRunnable r) {
     }
 
     /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
+     * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
      *
-     * @param orderingKey
-     * @param r
+     * @param orderingKey order key to submit the task
+     * @param r task to run
+     * @return listenable future on the completion of the task
      */
     public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
         return chooseThread(orderingKey).submit(timedRunnable(r));
@@ -348,7 +354,7 @@ public void submitOrdered(int orderingKey, SafeRunnable r) {
      * @param callable
      */
     public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
-                                                 java.util.concurrent.Callable<T> callable) {
+                                                 Callable<T> callable) {
         return chooseThread(orderingKey).submit(callable);
     }
 
@@ -463,12 +469,45 @@ protected long getThreadID(long orderingKey) {
         return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void shutdown() {
         for (int i = 0; i < threads.length; i++) {
             threads[i].shutdown();
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Runnable> shutdownNow() {
+        List<Runnable> runnables = new ArrayList<Runnable>();
+        for (ScheduledExecutorService executor : threads) {
+            runnables.addAll(executor.shutdownNow());
+        }
+        return runnables;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isShutdown() {
+        for (ScheduledExecutorService executor : threads) {
+            if (!executor.isShutdown()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
         boolean ret = true;
         for (int i = 0; i < threads.length; i++) {
@@ -477,6 +516,19 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
         return ret;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isTerminated() {
+        for (ScheduledExecutorService executor : threads) {
+            if (!executor.isTerminated()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * Force threads shutdown (cancel active requests) after specified delay,
      * to be used after shutdown() rejects new requests.
@@ -494,4 +546,112 @@ public void forceShutdown(long timeout, TimeUnit unit) {
         }
     }
 
+    //
+    // Methods for implementing {@link ScheduledExecutorService}
+    //
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        return chooseThread().schedule(command, delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        return chooseThread().schedule(callable, delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay, long period, TimeUnit unit) {
+        return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay, long delay, TimeUnit unit) {
+        return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return chooseThread().submit(task);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return chooseThread().submit(task, result);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Future<?> submit(Runnable task) {
+        return chooseThread().submit(task);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+        return chooseThread().invokeAll(tasks);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                         long timeout,
+                                         TimeUnit unit)
+        throws InterruptedException {
+        return chooseThread().invokeAll(tasks, timeout, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException, ExecutionException {
+        return chooseThread().invokeAny(tasks);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return chooseThread().invokeAny(tasks, timeout, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(Runnable command) {
+        chooseThread().execute(command);
+    }
+
 }
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index aede153ad..58ec8f39e 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -21,6 +21,12 @@
     <!-- imported code -->
     <Class name="~com\.scurrilous\.circe.*" />
   </Match>
+  <!-- bookkeeper-common -->
+  <Match>
+    <Class name="org.apache.bookkeeper.common.util.OrderedScheduler"/>
+    <Method name="submit" />
+    <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
+  </Match>
   <!-- bookkeeper-proto -->
   <Match>
     <!-- generated code, we can't be held responsible for findbugs in it //-->
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
deleted file mode 100644
index cfafca39e..000000000
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.distributedlog.common.util.MathUtil;
-
-/**
- * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing
- * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i>
- * will be executed in order.
- *
- * <p>The scheduler is comprised of multiple {@link ScheduledExecutorService}s. Each
- * {@link ScheduledExecutorService} is a single thread executor. Normal task submissions will
- * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g
- * {@link OrderedScheduler#submit(Object, Runnable)} will be submitted to a dedicated executor based on
- * the hash value of submit <i>key</i>.
- */
-public class OrderedScheduler
-        extends org.apache.bookkeeper.common.util.OrderedScheduler
-        implements ScheduledExecutorService {
-
-    /**
-     * Create a builder to build scheduler.
-     *
-     * @return scheduler builder
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder for {@link OrderedScheduler}.
-     */
-    public static class Builder {
-
-        private String name = "OrderedScheduler";
-        private int corePoolSize = -1;
-        private ThreadFactory threadFactory = null;
-
-        /**
-         * Set the name of this scheduler. It would be used as part of stats scope and thread name.
-         *
-         * @param name name of the scheduler.
-         * @return scheduler builder
-         */
-        public Builder name(String name) {
-            this.name = name;
-            return this;
-        }
-
-        /**
-         * Set the number of threads to be used in this scheduler.
-         *
-         * @param corePoolSize the number of threads to keep in the pool, even
-         *                     if they are idle
-         * @return scheduler builder
-         */
-        public Builder corePoolSize(int corePoolSize) {
-            this.corePoolSize = corePoolSize;
-            return this;
-        }
-
-        /**
-         * Set the thread factory that the scheduler uses to create a new thread.
-         *
-         * @param threadFactory the factory to use when the executor
-         *                      creates a new thread
-         * @return scheduler builder
-         */
-        public Builder threadFactory(ThreadFactory threadFactory) {
-            this.threadFactory = threadFactory;
-            return this;
-        }
-
-        /**
-         * Build the ordered scheduler.
-         *
-         * @return ordered scheduler
-         */
-        public OrderedScheduler build() {
-            if (corePoolSize <= 0) {
-                corePoolSize = Runtime.getRuntime().availableProcessors();
-            }
-            if (null == threadFactory) {
-                threadFactory = Executors.defaultThreadFactory();
-            }
-
-            return new OrderedScheduler(
-                name,
-                corePoolSize,
-                threadFactory);
-        }
-
-    }
-
-    protected final String name;
-    protected final int corePoolSize;
-    protected final ScheduledExecutorService[] executors;
-    protected final Random random;
-
-    private OrderedScheduler(String name,
-                             int corePoolSize,
-                             ThreadFactory threadFactory) {
-        super(
-            name,
-            corePoolSize,
-            threadFactory,
-            NullStatsLogger.INSTANCE,
-            false,
-            Long.MAX_VALUE,
-            Integer.MAX_VALUE);
-        this.name = name;
-        this.corePoolSize = corePoolSize;
-        this.executors = new ScheduledExecutorService[corePoolSize];
-        for (int i = 0; i < corePoolSize; i++) {
-            ThreadFactory tf = new ThreadFactoryBuilder()
-                .setNameFormat(name + "-scheduler-" + i + "-%d")
-                .setThreadFactory(threadFactory)
-                .build();
-            executors[i] = Executors.newSingleThreadScheduledExecutor(tf);
-        }
-        this.random = new Random(System.currentTimeMillis());
-    }
-
-    protected ScheduledExecutorService chooseExecutor() {
-        return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)];
-    }
-
-    public ScheduledExecutorService chooseExecutor(Object key) {
-        if (null == key) {
-            return chooseExecutor();
-        }
-        return corePoolSize == 1 ? executors[0] :
-            executors[MathUtil.signSafeMod(Objects.hashCode(key), corePoolSize)];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        return chooseExecutor().schedule(command, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-        return chooseExecutor().schedule(callable, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
-                                                  long initialDelay, long period, TimeUnit unit) {
-        return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
-                                                     long initialDelay, long delay, TimeUnit unit) {
-        return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void shutdown() {
-        for (ScheduledExecutorService executor : executors) {
-            executor.shutdown();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Runnable> shutdownNow() {
-        List<Runnable> runnables = new ArrayList<Runnable>();
-        for (ScheduledExecutorService executor : executors) {
-            runnables.addAll(executor.shutdownNow());
-        }
-        return runnables;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isShutdown() {
-        for (ScheduledExecutorService executor : executors) {
-            if (!executor.isShutdown()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isTerminated() {
-        for (ScheduledExecutorService executor : executors) {
-            if (!executor.isTerminated()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit)
-        throws InterruptedException {
-        for (ScheduledExecutorService executor : executors) {
-            if (!executor.awaitTermination(timeout, unit)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return chooseExecutor().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return chooseExecutor().submit(task, result);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Future<?> submit(Runnable task) {
-        return chooseExecutor().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-        return chooseExecutor().invokeAll(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException {
-        return chooseExecutor().invokeAll(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-        return chooseExecutor().invokeAny(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        return chooseExecutor().invokeAny(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void execute(Runnable command) {
-        chooseExecutor().execute(command);
-    }
-
-    // Ordered Functions
-
-    public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) {
-        return chooseExecutor(key).schedule(command, delay, unit);
-    }
-
-    public ScheduledFuture<?> scheduleAtFixedRate(Object key,
-                                                  Runnable command,
-                                                  long initialDelay,
-                                                  long period,
-                                                  TimeUnit unit) {
-        return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit);
-    }
-
-    public Future<?> submit(Object key, Runnable command) {
-        return chooseExecutor(key).submit(command);
-    }
-
-    public <T> CompletableFuture<T> submit(Object key, Callable<T> callable) {
-        CompletableFuture<T> future = FutureUtils.createFuture();
-        chooseExecutor(key).submit(() -> {
-            try {
-                future.complete(callable.call());
-            } catch (Exception e) {
-                future.completeExceptionally(e);
-            }
-        });
-        return future;
-    }
-
-}
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/util/package-info.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/util/package-info.java
deleted file mode 100644
index ee17950ed..000000000
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/util/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * defines the utilities used across the project.
- */
-package org.apache.distributedlog.util;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 0612d3aa1..337389633 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -33,6 +33,8 @@
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -44,7 +46,6 @@
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.ReadCancelledException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@
  * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
  * </ul>
  */
-class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification {
+class BKAsyncLogReader implements AsyncLogReader, SafeRunnable, AsyncNotification {
     static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class);
 
     private static final Function<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
@@ -100,14 +101,11 @@
 
     private final boolean returnEndOfStreamRecord;
 
-    private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
-        @Override
-        public void run() {
-            synchronized (scheduleLock) {
-                backgroundScheduleTask = null;
-            }
-            scheduleBackgroundRead();
+    private final SafeRunnable BACKGROUND_READ_SCHEDULER = () -> {
+        synchronized (scheduleLock) {
+            backgroundScheduleTask = null;
         }
+        scheduleBackgroundRead();
     };
 
     // State
@@ -256,9 +254,9 @@ synchronized void releaseCurrentEntry() {
             // Except when idle reader threshold is less than a second (tests?)
             period = Math.min(period, idleErrorThresholdMillis / 5);
 
-            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
+            return scheduler.scheduleAtFixedRateOrdered(streamName, new SafeRunnable() {
                 @Override
-                public void run() {
+                public void safeRun() {
                     PendingReadRequest nextRequest = pendingRequests.peek();
 
                     idleReaderCheckCount.inc();
@@ -474,7 +472,7 @@ public synchronized void scheduleBackgroundRead() {
         long prevCount = scheduleCount.getAndIncrement();
         if (0 == prevCount) {
             scheduleDelayStopwatch.reset().start();
-            scheduler.submit(streamName, this);
+            scheduler.submitOrdered(streamName, this);
         }
     }
 
@@ -567,7 +565,7 @@ private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
     }
 
     @Override
-    public void run() {
+    public void safeRun() {
         synchronized (scheduleLock) {
             if (scheduleDelayStopwatch.isRunning()) {
                 scheduleLatency.registerSuccessfulEvent(
@@ -674,7 +672,7 @@ public void run() {
                         scheduleDelayStopwatch.reset().start();
                         scheduleCount.set(0);
                         // the request could still wait for more records
-                        backgroundScheduleTask = scheduler.schedule(
+                        backgroundScheduleTask = scheduler.scheduleOrdered(
                                 streamName,
                                 BACKGROUND_READ_SCHEDULER,
                                 remainingWaitTime,
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index 1be0aec8d..43c0973c0 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -34,6 +34,7 @@
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -66,7 +67,6 @@
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.Allocator;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index b4cf0f737..02a0cb8d8 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.acl.AccessControlManager;
@@ -45,7 +46,6 @@
 import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index f48baac1d..afcd7bb57 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -35,6 +35,7 @@
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -54,7 +55,6 @@
 import org.apache.distributedlog.logsegment.PerStreamLogSegmentCache;
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
index 4ce16121c..66ca2de88 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -30,6 +30,7 @@
 import javax.annotation.Nullable;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -49,7 +50,6 @@
 import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
 import org.apache.distributedlog.metadata.LogMetadataForReader;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index cecb4f917..6263ddf76 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -24,6 +24,8 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.List;
@@ -42,6 +44,7 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
@@ -73,7 +76,6 @@
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentWriter;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.SimplePermitLimiter;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
@@ -388,7 +390,7 @@ DistributedLock getLock() {
 
     @VisibleForTesting
     ScheduledExecutorService getFuturePool() {
-        return scheduler.chooseExecutor(streamName);
+        return scheduler.chooseThread(streamName);
     }
 
     @VisibleForTesting
@@ -1190,7 +1192,7 @@ public void addComplete(final int rc, LedgerHandle handle,
 
         if (null != scheduler) {
             final Stopwatch queuedTime = Stopwatch.createStarted();
-            scheduler.submit(streamName, new Callable<Void>() {
+            Futures.addCallback(scheduler.submitOrdered(streamName, new Callable<Void>() {
                 @Override
                 public Void call() {
                     final Stopwatch deferredTime = Stopwatch.createStarted();
@@ -1208,7 +1210,7 @@ public String toString() {
                     return String.format("AddComplete(Stream=%s, entryId=%d, rc=%d)",
                             fullyQualifiedLogSegment, entryId, rc);
                 }
-            }).whenComplete(new FutureEventListener<Void>() {
+            }), new FutureCallback<Void>() {
                 @Override
                 public void onSuccess(Void done) {
                 }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index 2c5aaf28b..0ce0a25bb 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -32,6 +32,7 @@
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -62,7 +63,6 @@
 import org.apache.distributedlog.util.Allocator;
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 0469af118..a0dd6ad12 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -35,6 +35,8 @@
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.callback.LogSegmentListener;
@@ -48,12 +50,9 @@
 import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.logsegment.LogSegmentFilter;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * New ReadAhead Reader that uses {@link org.apache.distributedlog.logsegment.LogSegmentEntryReader}.
  *
@@ -204,7 +203,7 @@ synchronized boolean isClosed() {
         }
     }
 
-    private abstract class CloseableRunnable implements Runnable {
+    private abstract class CloseableRunnable implements SafeRunnable {
 
         @Override
         public void run() {
@@ -220,8 +219,6 @@ public void run() {
             }
         }
 
-        abstract void safeRun();
-
     }
 
     //
@@ -321,7 +318,7 @@ public ReadAheadEntryReader(String streamName,
 
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) {
-            return scheduler.scheduleAtFixedRate(streamName, () -> {
+            return scheduler.scheduleAtFixedRateOrdered(streamName, () -> {
                 if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
                     return;
                 }
@@ -390,14 +387,14 @@ boolean isInitialized() {
         return isInitialized;
     }
 
-    private void orderedSubmit(Runnable runnable) {
+    private void orderedSubmit(SafeRunnable runnable) {
         synchronized (this) {
             if (null != closePromise) {
                 return;
             }
         }
         try {
-            scheduler.submit(streamName, runnable);
+            scheduler.submitOrdered(streamName, runnable);
         } catch (RejectedExecutionException ree) {
             logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
                     streamName, ree);
@@ -414,7 +411,7 @@ public void start(final List<LogSegmentMetadata> segmentList) {
     private void removeClosedSegmentReaders() {
         orderedSubmit(new CloseableRunnable() {
             @Override
-            void safeRun() {
+            public void safeRun() {
                 unsafeRemoveClosedSegmentReaders();
             }
         });
@@ -452,12 +449,7 @@ synchronized boolean isClosed() {
         // use runnable here instead of CloseableRunnable,
         // because we need this to be executed
         try {
-            scheduler.submit(streamName, new Runnable() {
-                @Override
-                public void run() {
-                    unsafeAsyncClose(closeFuture);
-                }
-            });
+            scheduler.submitOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
         } catch (RejectedExecutionException ree) {
             logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
                     streamName, ree);
@@ -684,7 +676,7 @@ public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) {
     void processLogSegments(final List<LogSegmentMetadata> segments) {
         orderedSubmit(new CloseableRunnable() {
             @Override
-            void safeRun() {
+            public void safeRun() {
                 unsafeProcessLogSegments(segments);
             }
         });
@@ -907,7 +899,7 @@ private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) {
     void moveToNextLogSegment() {
         orderedSubmit(new CloseableRunnable() {
             @Override
-            void safeRun() {
+            public void safeRun() {
                 unsafeMoveToNextLogSegment();
             }
         });
@@ -957,7 +949,7 @@ private void unsafeMoveToNextLogSegment() {
     void scheduleReadNext() {
         orderedSubmit(new CloseableRunnable() {
             @Override
-            void safeRun() {
+            public void safeRun() {
                 if (null == currentSegmentReader) {
                     pauseReadAheadOnNoMoreLogSegments();
                     return;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
index 52d19c3d5..2ecd33855 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
@@ -41,6 +41,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -66,7 +67,6 @@
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.thrift.AccessControlEntry;
 import org.apache.distributedlog.tools.DistributedLogTool;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -738,9 +738,9 @@ protected int runCmd() throws Exception {
                             getLogSegmentMetadataStore()) :
                     LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
                             getLogSegmentMetadataStore());
-            OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+            OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
                     .name("dlck-scheduler")
-                    .corePoolSize(Runtime.getRuntime().availableProcessors())
+                    .numThreads(Runtime.getRuntime().availableProcessors())
                     .build();
             ExecutorService executorService = Executors.newCachedThreadPool();
             try {
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
index 8484be9cd..ab552dc5c 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
@@ -23,6 +23,7 @@
 import java.net.URI;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Stable;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
@@ -40,7 +41,6 @@
 import org.apache.distributedlog.namespace.NamespaceDriverManager;
 import org.apache.distributedlog.util.ConfUtils;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.SimplePermitLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -230,9 +230,9 @@ public Namespace build()
         StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf);
 
         // build the scheduler
-        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+        OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("DLM-" + normalizedUri.getPath())
-                .corePoolSize(_conf.getNumWorkerThreads())
+                .numThreads(_conf.getNumWorkerThreads())
                 .build();
 
         // initialize the namespace driver
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index 28c699bd4..f64bae117 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
@@ -67,7 +68,6 @@
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.namespace.NamespaceDriverManager;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
index 661688cd5..a37484e2d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
@@ -26,13 +26,13 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.commons.lang.StringUtils;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.metadata.LogMetadataStore;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index ebfe4c743..5a9ee1a6c 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -32,6 +32,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -46,7 +48,6 @@
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.distributedlog.util.Utils;
@@ -73,7 +74,7 @@
 
     private static final List<String> EMPTY_LIST = ImmutableList.of();
 
-    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
+    private static class ReadLogSegmentsTask implements SafeRunnable, FutureEventListener<Versioned<List<String>>> {
 
         private final String logSegmentsPath;
         private final ZKLogSegmentMetadataStore store;
@@ -112,7 +113,7 @@ public void onFailure(Throwable cause) {
         }
 
         @Override
-        public void run() {
+        public void safeRun() {
             if (null != store.listeners.get(logSegmentsPath)) {
                 store.zkGetLogSegmentNames(logSegmentsPath, store).whenComplete(this);
             } else {
@@ -193,25 +194,25 @@ public ZKLogSegmentMetadataStore(DistributedLogConfiguration conf,
         this.skipMinVersionCheck = conf.getDLLedgerMetadataSkipMinVersionCheck();
     }
 
-    protected void scheduleTask(Object key, Runnable r, long delayMs) {
+    protected void scheduleTask(Object key, SafeRunnable r, long delayMs) {
         closeLock.readLock().lock();
         try {
             if (closed) {
                 return;
             }
-            scheduler.schedule(key, r, delayMs, TimeUnit.MILLISECONDS);
+            scheduler.scheduleOrdered(key, r, delayMs, TimeUnit.MILLISECONDS);
         } finally {
             closeLock.readLock().unlock();
         }
     }
 
-    protected void submitTask(Object key, Runnable r) {
+    protected void submitTask(Object key, SafeRunnable r) {
         closeLock.readLock().lock();
         try {
             if (closed) {
                 return;
             }
-            scheduler.submit(key, r);
+            scheduler.submitOrdered(key, r);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -474,12 +475,9 @@ void notifyLogStreamDeleted(String logSegmentsPath,
         if (null == listeners) {
             return;
         }
-        this.submitTask(logSegmentsPath, new Runnable() {
-            @Override
-            public void run() {
-                for (LogSegmentNamesListener listener : listeners.keySet()) {
-                    listener.onLogStreamDeleted();
-                }
+        this.submitTask(logSegmentsPath, () -> {
+            for (LogSegmentNamesListener listener : listeners.keySet()) {
+                listener.onLogStreamDeleted();
             }
         });
 
@@ -491,12 +489,9 @@ void notifyLogSegmentsUpdated(String logSegmentsPath,
         if (null == listeners) {
             return;
         }
-        this.submitTask(logSegmentsPath, new Runnable() {
-            @Override
-            public void run() {
-                for (VersionedLogSegmentNamesListener listener : listeners.values()) {
-                    listener.onSegmentsUpdated(segments);
-                }
+        this.submitTask(logSegmentsPath, () -> {
+            for (VersionedLogSegmentNamesListener listener : listeners.values()) {
+                listener.onSegmentsUpdated(segments);
             }
         });
     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKNamespaceWatcher.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKNamespaceWatcher.java
index ec0bc5b26..9d9783f4f 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKNamespaceWatcher.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKNamespaceWatcher.java
@@ -25,11 +25,11 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.namespace.NamespaceWatcher;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 5dd6a88d7..d208aabd2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -42,6 +42,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.callback.NamespaceListener;
@@ -51,7 +52,6 @@
 import org.apache.distributedlog.impl.ZKNamespaceWatcher;
 import org.apache.distributedlog.metadata.LogMetadataStore;
 import org.apache.distributedlog.namespace.NamespaceWatcher;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index be609a077..d62c8e73b 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -39,6 +39,8 @@
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -51,18 +53,17 @@
 import org.apache.distributedlog.exceptions.ReadCancelledException;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * BookKeeper ledger based log segment entry reader.
  */
-public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, AsyncCallback.OpenCallback {
+public class BKLogSegmentEntryReader implements SafeRunnable, LogSegmentEntryReader, AsyncCallback.OpenCallback {
 
     private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
 
-    private class CacheEntry implements Runnable, AsyncCallback.ReadCallback,
+    private class CacheEntry implements SafeRunnable, AsyncCallback.ReadCallback,
             AsyncCallback.ReadLastConfirmedAndEntryCallback {
 
         protected final long entryId;
@@ -226,7 +227,7 @@ boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
                     || (isLongPoll && BKException.Code.NoSuchLedgerExistsException == rc)) {
                 int numErrors = Math.max(1, numReadErrors.incrementAndGet());
                 int nextReadBackoffTime = Math.min(numErrors * readAheadWaitTime, maxReadBackoffTime);
-                scheduler.schedule(
+                scheduler.scheduleOrdered(
                         getSegment().getLogSegmentId(),
                         this,
                         nextReadBackoffTime,
@@ -238,7 +239,7 @@ boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
         }
 
         @Override
-        public void run() {
+        public void safeRun() {
             issueRead(this);
         }
     }
@@ -480,12 +481,11 @@ private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata segment) {
             return;
         }
         // the reader is still catching up, retry opening the log segment later
-        scheduler.schedule(segment.getLogSegmentId(), new Runnable() {
-            @Override
-            public void run() {
-                onLogSegmentMetadataUpdated(segment);
-            }
-        }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
+        scheduler.scheduleOrdered(
+            segment.getLogSegmentId(),
+            () -> onLogSegmentMetadataUpdated(segment),
+            conf.getZKRetryBackoffStartMillis(),
+            TimeUnit.MILLISECONDS);
     }
 
     //
@@ -684,7 +684,7 @@ private void processReadRequests() {
 
         long prevCount = scheduleCount.getAndIncrement();
         if (0 == prevCount) {
-            scheduler.submit(getSegment().getLogSegmentId(), this);
+            scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
         }
     }
 
@@ -692,7 +692,7 @@ private void processReadRequests() {
      * The core function to propagate fetched entries to read requests.
      */
     @Override
-    public void run() {
+    public void safeRun() {
         long scheduleCountLocal = scheduleCount.get();
         while (true) {
             PendingReadRequest nextRequest = null;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index c4ffbe48d..aaff62e23 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -26,6 +26,7 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -45,7 +46,6 @@
 import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.Allocator;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 031b8a050..1e36356b2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -72,7 +73,6 @@
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.LimitedPermitManager;
@@ -133,9 +133,9 @@ public ZKLogStreamMetadataStore(String clientId,
 
     private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
         if (createIfNull && null == lockStateExecutor) {
-            lockStateExecutor = OrderedScheduler.newBuilder()
+            lockStateExecutor = OrderedScheduler.newSchedulerBuilder()
                     .name("DLM-LockState")
-                    .corePoolSize(conf.getNumLockStateThreads())
+                    .numThreads(conf.getNumLockStateThreads())
                     .build();
         }
         return lockStateExecutor;
@@ -299,7 +299,7 @@ public void processResult(final int rc, final String path, Object ctx, String na
                     conf.getLockTimeoutMilliSeconds(),
                     statsLogger.scope("read_lock"));
                 return lock;
-            }, scheduler.chooseExecutor(readLockPath));
+            }, scheduler.chooseThread(readLockPath));
     }
 
     //
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
index 47f032fc5..b0d1f7fd4 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
@@ -21,8 +21,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 1d18417a9..81d721b9e 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -34,7 +35,6 @@
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,7 +142,7 @@ private synchronized void checkLockState() throws LockingException {
             if (null == throwable || !(throwable instanceof CancellationException)) {
                 return;
             }
-            lockStateExecutor.submit(lockPath, () -> asyncClose());
+            lockStateExecutor.submitOrdered(lockPath, () -> asyncClose());
         });
         final Stopwatch stopwatch = Stopwatch.createStarted();
         promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
@@ -162,12 +162,8 @@ public void onFailure(Throwable cause) {
             }
         });
         this.lockAcquireFuture = promise;
-        lockStateExecutor.submit(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                doAsyncAcquireWithSemaphore(promise, lockTimeout);
-            }
-        });
+        lockStateExecutor.submitOrdered(
+            lockPath, () -> doAsyncAcquireWithSemaphore(promise, lockTimeout));
         return promise;
     }
 
@@ -218,7 +214,7 @@ public void onSuccess(SessionLock lock) {
             public void onFailure(Throwable cause) {
                 FutureUtils.completeExceptionally(acquirePromise, cause);
             }
-        }, lockStateExecutor.chooseExecutor(lockPath));
+        }, lockStateExecutor.chooseThread(lockPath));
     }
 
     void asyncTryLock(SessionLock lock,
@@ -251,7 +247,7 @@ public void onSuccess(LockWaiter waiter) {
                 public void onFailure(Throwable cause) {
                     FutureUtils.completeExceptionally(acquirePromise, cause);
                 }
-            }, lockStateExecutor.chooseExecutor(lockPath));
+            }, lockStateExecutor.chooseThread(lockPath));
     }
 
     void waitForAcquire(final LockWaiter waiter,
@@ -273,7 +269,7 @@ public void onSuccess(Boolean acquired) {
                 public void onFailure(Throwable cause) {
                     FutureUtils.completeExceptionally(acquirePromise, cause);
                 }
-            }, lockStateExecutor.chooseExecutor(lockPath));
+            }, lockStateExecutor.chooseThread(lockPath));
     }
 
     /**
@@ -369,7 +365,7 @@ public void onSuccess(Boolean value) {
                     public void onFailure(Throwable cause) {
                         unlockInternalLock(closePromise);
                     }
-                }, lockStateExecutor.chooseExecutor(lockPath));
+                }, lockStateExecutor.chooseThread(lockPath));
             waiter.getAcquireFuture().cancel(true);
         }
     }
@@ -389,7 +385,7 @@ public void onSuccess(LockWaiter waiter) {
                     public void onFailure(Throwable cause) {
                         unlockInternalLock(closePromise);
                     }
-                }, lockStateExecutor.chooseExecutor(lockPath));
+                }, lockStateExecutor.chooseThread(lockPath));
             tryLockFuture.cancel(true);
         }
     }
@@ -426,25 +422,17 @@ public void onFailure(Throwable cause) {
             private void complete() {
                 FutureUtils.complete(closePromise, null);
             }
-        }, lockStateExecutor.chooseExecutor(lockPath));
-        lockStateExecutor.submit(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                closeWaiter(lockWaiter, closeWaiterFuture);
-            }
-        });
+        }, lockStateExecutor.chooseThread(lockPath));
+        lockStateExecutor.submitOrdered(
+            lockPath, () -> closeWaiter(lockWaiter, closeWaiterFuture));
         return closePromise;
     }
 
     void internalReacquireLock(final AtomicInteger numRetries,
                                final long lockTimeout,
                                final CompletableFuture<ZKDistributedLock> reacquirePromise) {
-        lockStateExecutor.submit(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise);
-            }
-        });
+        lockStateExecutor.submitOrdered(
+            lockPath, () -> doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise));
     }
 
     void doInternalReacquireLock(final AtomicInteger numRetries,
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index 3b42c2867..f018e3602 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -35,6 +35,7 @@
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -50,7 +51,6 @@
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -391,7 +391,7 @@ public boolean isLockHeld() {
      *          function to execute a lock action
      */
     protected void executeLockAction(final int lockEpoch, final LockAction func) {
-        lockStateExecutor.submit(lockPath, new SafeRunnable() {
+        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 if (ZKSessionLock.this.epoch.get() == lockEpoch) {
@@ -430,7 +430,7 @@ public void safeRun() {
      */
     protected <T> void executeLockAction(final int lockEpoch,
                                          final LockAction func, final CompletableFuture<T> promise) {
-        lockStateExecutor.submit(lockPath, new SafeRunnable() {
+        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 int currentEpoch = ZKSessionLock.this.epoch.get();
@@ -553,7 +553,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta
                 @Override
                 public void processResult(final int rc, String path, Object ctx,
                                           final List<String> children, Stat stat) {
-                    lockStateExecutor.submit(lockPath, new SafeRunnable() {
+                    lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             if (!lockState.inState(State.INIT)) {
@@ -583,7 +583,7 @@ public void onSuccess(Pair<String, Long> owner) {
                                             public void onFailure(final Throwable cause) {
                                                 result.completeExceptionally(cause);
                                             }
-                                        }, lockStateExecutor.chooseExecutor(lockPath));
+                                        }, lockStateExecutor.chooseThread(lockPath));
                             } else {
                                 asyncTryLock(wait, result);
                             }
@@ -647,7 +647,7 @@ public void onFailure(Throwable cause) {
     private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner,
                                           final CompletableFuture<String> result) {
         if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) {
-            lockStateExecutor.submit(lockPath, new SafeRunnable() {
+            lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     result.complete(currentOwner.getLeft());
@@ -877,7 +877,7 @@ synchronized LockWaiter waitForTry(Stopwatch stopwatch, CompletableFuture<LockWa
         // Use lock executor here rather than lock action, because we want this opertaion to be applied
         // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no
         // risk of an ABA problem where we delete and recreate a node and then delete it again here.
-        lockStateExecutor.submit(lockPath, new SafeRunnable() {
+        lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
                 acquireFuture.completeExceptionally(cause);
@@ -967,7 +967,7 @@ public void onFailure(Throwable cause) {
                 LOG.error("lock node delete failed {} {}", lockId, lockPath);
                 promise.complete(null);
             }
-        }, lockStateExecutor.chooseExecutor(lockPath));
+        }, lockStateExecutor.chooseThread(lockPath));
     }
 
     private void deleteLockNode(final CompletableFuture<Void> promise) {
@@ -979,7 +979,7 @@ private void deleteLockNode(final CompletableFuture<Void> promise) {
         zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(final int rc, final String path, Object ctx) {
-                lockStateExecutor.submit(lockPath, new SafeRunnable() {
+                lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         if (KeeperException.Code.OK.intValue() == rc) {
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
index 9de94d567..030379c7e 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
@@ -24,10 +24,11 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.OrderedScheduler;
 
 /**
  * Factory to create zookeeper based locks.
@@ -88,9 +89,9 @@ void createLock(final String lockPath,
                     final AtomicInteger numRetries,
                     final CompletableFuture<SessionLock> createPromise,
                     final long delayMs) {
-        lockStateExecutor.schedule(lockPath, new Runnable() {
+        lockStateExecutor.scheduleOrdered(lockPath, new SafeRunnable() {
             @Override
-            public void run() {
+            public void safeRun() {
                 if (null != interruptedException.get()) {
                     createPromise.completeExceptionally(interruptedException.get());
                     return;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
index 84f8ff3d3..9c61be391 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
@@ -20,6 +20,7 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -31,7 +32,6 @@
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.metadata.LogMetadataStore;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-import org.apache.distributedlog.util.OrderedScheduler;
 
 /**
  * Manager to manage all the stores required by a namespace.
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index 6a703136e..14c751e3a 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -38,6 +38,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -64,7 +65,6 @@
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import org.apache.distributedlog.metadata.MetadataUpdater;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -1023,9 +1023,9 @@ public void testTruncationValidation() throws Exception {
         ZooKeeperClient zookeeperClient = TestZooKeeperClientBuilder.newBuilder()
             .uri(uri)
             .build();
-        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+        OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-truncation-validation")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
         DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
         confLocal.loadConf(conf);
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
index 6ad3c0f59..7b18bcf9b 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
@@ -33,6 +33,7 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -49,7 +50,6 @@
 import org.apache.distributedlog.lock.ZKDistributedLock;
 import org.apache.distributedlog.lock.ZKSessionLockFactory;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -80,8 +80,8 @@
     @Override
     public void setup() throws Exception {
         super.setup();
-        scheduler = OrderedScheduler.newBuilder().corePoolSize(1).build();
-        lockStateExecutor = OrderedScheduler.newBuilder().corePoolSize(1).build();
+        scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
+        lockStateExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
         // build zookeeper client
         URI uri = createDLMURI("");
         zkc = TestZooKeeperClientBuilder.newBuilder(conf)
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index 37f692dd2..903e6843e 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -51,7 +52,6 @@
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -186,8 +186,8 @@ public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf,
                 .uri(uri)
                 .conf(conf)
                 .build();
-        final OrderedScheduler scheduler = OrderedScheduler.newBuilder()
-                .corePoolSize(1)
+        final OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(1)
                 .name("test-scheduler")
                 .build();
         AsyncCloseable resourcesCloseable = new AsyncCloseable() {
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index 18af307cd..517b0dfcd 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.distributedlog.api.AsyncLogWriter;
@@ -41,7 +42,6 @@
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.junit.After;
 import org.junit.Before;
@@ -88,9 +88,9 @@ public void setup() throws Exception {
                 .ledgersPath("/ledgers")
                 .zkServers(bkutil.getZkServers())
                 .build();
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-read-ahead-entry-reader")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
     }
 
@@ -138,11 +138,15 @@ private ReadAheadEntryReader createEntryReader(String streamName,
 
     private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
         final CompletableFuture<Void> promise = new CompletableFuture<Void>();
-        scheduler.submit(streamName, new Runnable() {
-            @Override
-            public void run() {
-                FutureUtils.complete(promise, null);
-            }
+        scheduler.submitOrdered(streamName, () -> {
+            FutureUtils.complete(promise, null);
+            // the following line is needed for oraclejdk9 to avoid following exception
+            // ```
+            // incompatible types: inference variable T has incompatible bounds
+            // upper bounds: java.lang.Object
+            // lower bounds: void
+            // ```
+            return (Void) null;
         });
         Utils.ioResult(promise);
     }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
index 42ff9aea6..85efb2261 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -40,7 +41,6 @@
 import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.After;
@@ -114,9 +114,9 @@ public void testCheckAndRepairDLNamespace() throws Exception {
                 .conf(confLocal)
                 .uri(uri)
                 .build();
-        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+        OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("dlck-tool")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
         ExecutorService executorService = Executors.newCachedThreadPool();
 
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
index 161222033..2ba24db47 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
@@ -24,11 +24,11 @@
 import com.google.common.collect.Sets;
 import java.net.URI;
 import java.util.Set;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -60,9 +60,9 @@ public void setup() throws Exception {
                 .uri(createDLMURI("/"))
                 .sessionTimeoutMs(zkSessionTimeoutMs)
                 .build();
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-zk-logmetadata-store")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
         conf.addConfiguration(baseConf);
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index eda73a78c..c26a7f0a0 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -49,7 +50,6 @@
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
@@ -109,9 +109,9 @@ public void setup() throws Exception {
                 .uri(createDLMURI("/"))
                 .sessionTimeoutMs(zkSessionTimeoutMs)
                 .build();
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-zk-logsegment-metadata-store")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
         conf.addConfiguration(baseConf);
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
index e2761b218..29b42bb79 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
@@ -27,13 +27,13 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientUtils;
 import org.apache.distributedlog.callback.NamespaceListener;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -65,9 +65,9 @@ public void setup() throws Exception {
                 .uri(createDLMURI("/"))
                 .sessionTimeoutMs(zkSessionTimeoutMs)
                 .build();
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-zk-namespace-watcher")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
     }
 
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
index d2c67995e..f32f4ce3b 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
@@ -45,7 +46,6 @@
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.metadata.LogMetadataStore;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.junit.After;
 import org.junit.Before;
@@ -131,9 +131,9 @@ public void setup() throws Exception {
                 .uri(createDLMURI("/"))
                 .sessionTimeoutMs(zkSessionTimeoutMs)
                 .build();
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-zk-logmetadata-store")
-                .corePoolSize(2)
+                .numThreads(2)
                 .build();
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
         conf.addConfiguration(baseConf);
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
index 5a2ffd134..a05dd6e2b 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
@@ -48,7 +49,6 @@
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.junit.After;
 import org.junit.Before;
@@ -85,9 +85,9 @@ public void setup() throws Exception {
                 .ledgersPath("/ledgers")
                 .zkServers(bkutil.getZkServers())
                 .build();
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-bk-logsegment-entry-reader")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
     }
 
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index fdd9f492f..018293071 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -49,6 +49,7 @@
 import java.util.Collections;
 import java.util.List;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
@@ -70,7 +71,6 @@
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -204,9 +204,9 @@ public void setup() throws Exception {
         } catch (KeeperException.NodeExistsException nee) {
             logger.debug("The namespace uri already exists.");
         }
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("test-scheduler")
-            .corePoolSize(1)
+            .numThreads(1)
             .build();
         metadataStore = new ZKLogStreamMetadataStore(
             "test-logstream-metadata-store",
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
index bd3b71bd6..89c3817c7 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.TestDistributedLogBase;
@@ -42,7 +43,6 @@
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -86,9 +86,9 @@ public void setup() throws Exception {
                 .sessionTimeoutMs(sessionTimeoutMs)
                 .zkAclId(null)
                 .build();
-        lockStateExecutor = OrderedScheduler.newBuilder()
+        lockStateExecutor = OrderedScheduler.newSchedulerBuilder()
                 .name("test-scheduer")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
     }
 
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index 498f7f226..ad1ebc175 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -38,8 +38,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -50,7 +50,6 @@
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.lock.ZKSessionLock.State;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -64,10 +63,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
-
 /**
  * Distributed Lock Tests.
  */
@@ -100,8 +95,8 @@ public void setup() throws Exception {
                 .zkServers(zkServers)
                 .zkAclId(null)
                 .build();
-        lockStateExecutor = OrderedScheduler.newBuilder()
-                .corePoolSize(1)
+        lockStateExecutor = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(1)
                 .build();
     }
 
@@ -784,12 +779,7 @@ public void onExpired() {
         // expire session
         ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
         // submit a runnable to lock state executor to ensure any state changes happened when session expired
-        lockStateExecutor.submit(lockPath, new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                expiredLatch.countDown();
-            }
-        });
+        lockStateExecutor.submitOrdered(lockPath, () -> expiredLatch.countDown());
         expiredLatch.await();
         // no watcher was registered if never acquired lock successfully
         assertEquals(State.INIT, lock.getLockState());
@@ -1226,12 +1216,7 @@ private void testLockWhenSiblingUseSameLockId(long timeout, final boolean isUnlo
         } else {
             ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
             final CountDownLatch latch = new CountDownLatch(1);
-            lockStateExecutor.submit(lockPath, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    latch.countDown();
-                }
-            });
+            lockStateExecutor.submitOrdered(lockPath, () -> latch.countDown());
             latch.await();
             children = getLockWaiters(zkc, lockPath);
             assertEquals(0, children.size());
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
index ba60e38a9..c21068fb2 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
@@ -25,6 +25,7 @@
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -35,7 +36,6 @@
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -62,9 +62,9 @@
 
     @Before
     public void setup() throws Exception {
-        scheduler = OrderedScheduler.newBuilder()
+        scheduler = OrderedScheduler.newSchedulerBuilder()
                 .name("test-logsegment-metadata-store-updater")
-                .corePoolSize(1)
+                .numThreads(1)
                 .build();
         zkc = TestZooKeeperClientBuilder.newBuilder()
                 .uri(createURI("/"))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services