You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/04/03 03:03:20 UTC
[bookkeeper] branch master updated: Refactored OrderedSafeExecutor
and OrderedScheduler
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 46171e6 Refactored OrderedSafeExecutor and OrderedScheduler
46171e6 is described below
commit 46171e67e526702487641438144f28b7eb1aa07b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 2 20:03:12 2018 -0700
Refactored OrderedSafeExecutor and OrderedScheduler
As outlined in https://lists.apache.org/thread.html/102383ea42f473f36720637e41af0ee83fc38d9f992736e0d1a7f985%3Cdev.bookkeeper.apache.org%3E
right now `OrderedSafeExecutor` is implemented on top of `OrderedScheduler`. There are few problems with this approach that are causing impact on performance:
1. `OrderedScheduler` is a `ScheduledExecutorService` which uses a priority queue for tasks. The priority queue has a single mutex for both publishers/consumers on the queue
2. There are many objects created for each task submission, due to listenable future decorators
Since in all cases in critical write/read path we don't need delay task execution or futures, we should try to have a light weight execution for that.
### Modifications
* Inverted the hierarchy between `OrderedSafeExecutor` and `OrderedScheduler`. Now the base class is `OrderedSafeExecutor` and the other extends from it, since it provides additional methods.
* Moved `OrderedSafeExecutor` in `bookkeeper-common` since `OrderedScheduler` was already there.
* Moved `OrderedSafeGenericCallback` in its own file, since it needs to be in `bookkeeper-server` module at this point.
* Changed some method names from `submitOrdered()` into `executeOrdered()` to be consistent with JDK name (`submit()` returns a future while `execute()` returns void).
* Changed `BookKeeper` instance of `scheduler` into `OrderedScheduler` so that the few cases which were using the `mainWorkerPool` could be easily converted to use the scheduler instead.
Author: Matteo Merli <mm...@apache.org>
Reviewers: Andrey Yegorov <None>, Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1309 from merlimat/refactor-ordered-executor and squashes the following commits:
9af6eadc2 [Matteo Merli] Fixed findbugs configuration to ignore genereted JMH code
c85ad21e5 [Matteo Merli] Use ImmutableMap
22eba18c8 [Matteo Merli] Added microbenchmark
2c67d0e56 [Matteo Merli] Fix for TestMaxSizeWorkersQueue
54a72ba1b [Matteo Merli] Fix for TestReadAheadEntryReader
019d5202d [Matteo Merli] More fixes for submitOrdered
b8175f311 [Matteo Merli] Fixed array initialization type
5d68d431d [Matteo Merli] Fixed class names in integration tests
7c529dde7 [Matteo Merli] Fixed submitOrdered in DL code
c91259b7d [Matteo Merli] Removed "final" modifier on checkQueue
c5f70d5b4 [Matteo Merli] Merge remote-tracking branch 'apache/master' into refactor-ordered-executor
957c98309 [Matteo Merli] Correctly handle number of tasks in queue check
e45f78c09 [Matteo Merli] Renamed into OrderedExecutor and OrderedGenericCallback
926868429 [Matteo Merli] Refactored OrderedSafeExecutor and OrderedScheduler
---
.../apache/bookkeeper/benchmark/BenchBookie.java | 4 +-
.../common/util/BoundedExecutorService.java | 109 +++++
.../util/BoundedScheduledExecutorService.java | 30 +-
...{OrderedScheduler.java => OrderedExecutor.java} | 457 +++++++--------------
.../bookkeeper/common/util/OrderedScheduler.java | 431 ++-----------------
.../org/apache/bookkeeper/common/util/Retries.java | 2 +-
.../org/apache/bookkeeper/bookie/BookieShell.java | 9 +-
.../org/apache/bookkeeper/client/BookKeeper.java | 16 +-
.../bookkeeper/client/ExplicitLacFlushPolicy.java | 2 +-
.../apache/bookkeeper/client/LedgerDeleteOp.java | 4 +-
.../client/LedgerFragmentReplicator.java | 4 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 29 +-
.../apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
.../org/apache/bookkeeper/client/LedgerOpenOp.java | 4 +-
.../org/apache/bookkeeper/client/PendingAddOp.java | 2 +-
.../apache/bookkeeper/client/PendingReadOp.java | 2 +-
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 2 +-
.../org/apache/bookkeeper/proto/BookieClient.java | 16 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 41 +-
.../bookkeeper/proto/PerChannelBookieClient.java | 18 +-
.../bookkeeper/util/OrderedGenericCallback.java | 77 ++++
.../bookkeeper/util/OrderedSafeExecutor.java | 280 -------------
.../bookkeeper/client/MockBookKeeperTestCase.java | 28 +-
.../client/TestGetBookieInfoTimeout.java | 6 +-
.../bookkeeper/proto/TestBackwardCompatCMS42.java | 6 +-
.../proto/TestPerChannelBookieClient.java | 16 +-
.../apache/bookkeeper/test/BookieClientTest.java | 6 +-
.../main/resources/bookkeeper/findbugsExclude.xml | 3 +
.../common/OrderedExecutorBenchmark.java | 82 ++++
.../apache/distributedlog/BKAsyncLogReader.java | 3 +-
.../distributedlog/ReadAheadEntryReader.java | 4 +-
.../impl/ZKLogSegmentMetadataStore.java | 2 +-
.../impl/logsegment/BKLogSegmentEntryReader.java | 2 +-
.../distributedlog/lock/ZKDistributedLock.java | 11 +-
.../apache/distributedlog/lock/ZKSessionLock.java | 14 +-
.../distributedlog/TestReadAheadEntryReader.java | 11 +-
.../distributedlog/lock/TestZKSessionLock.java | 4 +-
.../impl/sc/FailRequestStorageContainer.java | 2 +-
.../shaded/BookKeeperServerShadedJarTest.java | 2 +-
.../shaded/DistributedLogCoreShadedJarTest.java | 4 +-
40 files changed, 610 insertions(+), 1137 deletions(-)
diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index db8ccbf..3efff7a 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
@@ -38,7 +39,6 @@ import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -165,7 +165,7 @@ public class BenchBookie {
eventLoop = new NioEventLoopGroup();
}
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+ OrderedExecutor executor = OrderedExecutor.newBuilder()
.name("BenchBookieClientScheduler")
.numThreads(1)
.build();
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java
new file mode 100644
index 0000000..655d49d
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implements {@link ExecutorService} and allows limiting the number of tasks to
+ * be scheduled in the thread's queue.
+ */
+public class BoundedExecutorService extends ForwardingExecutorService {
+ private final BlockingQueue<Runnable> queue;
+ private final ThreadPoolExecutor thread;
+ private final int maxTasksInQueue;
+
+ public BoundedExecutorService(ThreadPoolExecutor thread, int maxTasksInQueue) {
+ this.queue = thread.getQueue();
+ this.thread = thread;
+ this.maxTasksInQueue = maxTasksInQueue;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return this.thread;
+ }
+
+ private void checkQueue(int numberOfTasks) {
+ if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
+ throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
+ }
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ checkQueue(tasks.size());
+ return super.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ checkQueue(tasks.size());
+ return super.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ checkQueue(tasks.size());
+ return super.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ checkQueue(tasks.size());
+ return super.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ checkQueue(1);
+ super.execute(command);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ checkQueue(1);
+ return super.submit(task);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ checkQueue(1);
+ return super.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ checkQueue(1);
+ return super.submit(task, result);
+ }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
index 482ca18..44c6f38 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
@@ -59,83 +59,83 @@ public class BoundedScheduledExecutorService extends ForwardingListeningExecutor
@Override
public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- this.checkQueue();
+ this.checkQueue(1);
return this.thread.schedule(command, delay, unit);
}
@Override
public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- this.checkQueue();
+ this.checkQueue(1);
return this.thread.schedule(callable, delay, unit);
}
@Override
public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
- this.checkQueue();
+ this.checkQueue(1);
return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit) {
- this.checkQueue();
+ this.checkQueue(1);
return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit);
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
- this.checkQueue();
+ this.checkQueue(1);
return super.submit(task);
}
@Override
public ListenableFuture<?> submit(Runnable task) {
- this.checkQueue();
+ this.checkQueue(1);
return super.submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
- this.checkQueue();
+ this.checkQueue(tasks.size());
return super.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException {
- this.checkQueue();
+ this.checkQueue(tasks.size());
return super.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
- this.checkQueue();
+ this.checkQueue(tasks.size());
return super.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- this.checkQueue();
+ this.checkQueue(tasks.size());
return super.invokeAny(tasks, timeout, unit);
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
- this.checkQueue();
+ this.checkQueue(1);
return super.submit(task, result);
}
@Override
public void execute(Runnable command) {
- this.checkQueue();
+ this.checkQueue(1);
super.execute(command);
}
- private void checkQueue() {
- if (this.maxTasksInQueue > 0 && this.queue.size() >= this.maxTasksInQueue) {
- throw new RejectedExecutionException("Queue at limit of " + this.maxTasksInQueue + " items");
+ private void checkQueue(int numberOfTasks) {
+ if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) {
+ throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items");
}
}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
similarity index 54%
copy from bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
index bf2a6fb..0f634d0 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,7 +20,7 @@ package org.apache.bookkeeper.common.util;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -31,13 +31,16 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -45,7 +48,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang.StringUtils;
/**
- * This class provides 2 things over the java {@link ScheduledExecutorService}.
+ * This class provides 2 things over the java {@link ExecutorService}.
*
* <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
* This means that exceptions in scheduled tasks wont go unnoticed and will be
@@ -58,12 +61,13 @@ import org.apache.commons.lang.StringUtils;
* achieved by hashing the key objects to threads by their {@link #hashCode()}
* method.
*/
-public class OrderedScheduler implements ScheduledExecutorService {
+@Slf4j
+public class OrderedExecutor implements ExecutorService {
public static final int NO_TASK_LIMIT = -1;
protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
final String name;
- final ListeningScheduledExecutorService threads[];
+ final ExecutorService threads[];
final long threadIds[];
final Random rand = new Random();
final OpStatsLogger taskExecutionStats;
@@ -72,24 +76,30 @@ public class OrderedScheduler implements ScheduledExecutorService {
final long warnTimeMicroSec;
final int maxTasksInQueue;
- /**
- * Create a builder to build ordered scheduler.
- *
- * @return builder to build ordered scheduler.
- */
- public static SchedulerBuilder newSchedulerBuilder() {
- return new SchedulerBuilder();
+
+ public static Builder newBuilder() {
+ return new Builder();
}
/**
- * Builder to build ordered scheduler.
+ * A builder class for an OrderedExecutor.
*/
- public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
+ public static class Builder extends AbstractBuilder<OrderedExecutor> {
+
+ @Override
+ public OrderedExecutor build() {
+ if (null == threadFactory) {
+ threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor");
+ }
+ return new OrderedExecutor(name, numThreads, threadFactory, statsLogger,
+ traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+ }
+ }
/**
* Abstract builder class to build {@link OrderedScheduler}.
*/
- public abstract static class AbstractBuilder<T extends OrderedScheduler> {
+ public abstract static class AbstractBuilder<T extends OrderedExecutor> {
protected String name = getClass().getSimpleName();
protected int numThreads = Runtime.getRuntime().availableProcessors();
protected ThreadFactory threadFactory = null;
@@ -138,7 +148,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
if (null == threadFactory) {
threadFactory = new DefaultThreadFactory(name);
}
- return (T) new OrderedScheduler(
+ return (T) new OrderedExecutor(
name,
numThreads,
threadFactory,
@@ -147,34 +157,42 @@ public class OrderedScheduler implements ScheduledExecutorService {
warnTimeMicroSec,
maxTasksInQueue);
}
-
}
- private class TimedRunnable implements SafeRunnable {
- final SafeRunnable runnable;
+ /**
+ * Decorator class for a runnable that measure the execution time.
+ */
+ protected class TimedRunnable implements Runnable {
+ final Runnable runnable;
final long initNanos;
- TimedRunnable(SafeRunnable runnable) {
+ TimedRunnable(Runnable runnable) {
this.runnable = runnable;
this.initNanos = MathUtils.nowInNano();
}
@Override
- public void safeRun() {
- taskPendingStats.registerSuccessfulEvent(
- MathUtils.elapsedNanos(initNanos),
- TimeUnit.NANOSECONDS);
+ public void run() {
+ taskPendingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(initNanos), TimeUnit.NANOSECONDS);
long startNanos = MathUtils.nowInNano();
- this.runnable.safeRun();
+ this.runnable.run();
long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
if (elapsedMicroSec >= warnTimeMicroSec) {
- LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
- runnable, runnable.getClass(), elapsedMicroSec);
+ log.warn("Runnable {}:{} took too long {} micros to execute.", runnable, runnable.getClass(),
+ elapsedMicroSec);
}
}
}
+ protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
+ return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
+ }
+
+ protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
+ return new BoundedExecutorService(executor, this.maxTasksInQueue);
+ }
+
/**
* Constructs Safe executor.
*
@@ -190,37 +208,30 @@ public class OrderedScheduler implements ScheduledExecutorService {
* - should we stat task execution
* @param warnTimeMicroSec
* - log long task exec warning after this interval
+ * @param maxTasksInQueue
+ * - maximum items allowed in a thread queue. -1 for no limit
*/
- protected OrderedScheduler(String baseName,
- int numThreads,
- ThreadFactory threadFactory,
- StatsLogger statsLogger,
- boolean traceTaskExecution,
- long warnTimeMicroSec,
- int maxTasksInQueue) {
+ protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
+ StatsLogger statsLogger, boolean traceTaskExecution,
+ long warnTimeMicroSec, int maxTasksInQueue) {
checkArgument(numThreads > 0);
checkArgument(!StringUtils.isBlank(baseName));
this.maxTasksInQueue = maxTasksInQueue;
this.warnTimeMicroSec = warnTimeMicroSec;
name = baseName;
- threads = new ListeningScheduledExecutorService[numThreads];
+ threads = new ExecutorService[numThreads];
threadIds = new long[numThreads];
for (int i = 0; i < numThreads; i++) {
- final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
- .setThreadFactory(threadFactory)
- .build());
- threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue);
+ ThreadPoolExecutor thread = createSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
+ .setThreadFactory(threadFactory).build());
+ threads[i] = getBoundedExecutor(thread);
final int idx = i;
try {
- threads[idx].submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- threadIds[idx] = Thread.currentThread().getId();
- }
+ threads[idx].submit(() -> {
+ threadIds[idx] = Thread.currentThread().getId();
}).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -271,7 +282,58 @@ public class OrderedScheduler implements ScheduledExecutorService {
this.traceTaskExecution = traceTaskExecution;
}
- public ListeningScheduledExecutorService chooseThread() {
+ /**
+ * Schedules a one time action to execute with an ordering guarantee on the key.
+ * @param orderingKey
+ * @param r
+ */
+ public void executeOrdered(Object orderingKey, SafeRunnable r) {
+ chooseThread(orderingKey).execute(timedRunnable(r));
+ }
+
+ /**
+ * Schedules a one time action to execute with an ordering guarantee on the key.
+ * @param orderingKey
+ * @param r
+ */
+ public void executeOrdered(long orderingKey, SafeRunnable r) {
+ chooseThread(orderingKey).execute(timedRunnable(r));
+ }
+
+ /**
+ * Schedules a one time action to execute with an ordering guarantee on the key.
+ * @param orderingKey
+ * @param r
+ */
+ public void executeOrdered(int orderingKey, SafeRunnable r) {
+ chooseThread(orderingKey).execute(timedRunnable(r));
+ }
+
+ public <T> ListenableFuture<T> submitOrdered(long orderingKey, Callable<T> task) {
+ SettableFuture<T> future = SettableFuture.create();
+ executeOrdered(orderingKey, () -> {
+ try {
+ T result = task.call();
+ future.set(result);
+ } catch (Throwable t) {
+ future.setException(t);
+ }
+ });
+
+ return future;
+ }
+
+
+ public long getThreadID(long orderingKey) {
+ // skip hashcode generation in this special case
+ if (threadIds.length == 1) {
+ return threadIds[0];
+ }
+
+ return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
+ }
+
+ public ExecutorService chooseThread() {
// skip random # generation in this special case
if (threads.length == 1) {
return threads[0];
@@ -280,7 +342,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
return threads[rand.nextInt(threads.length)];
}
- public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
+ public ExecutorService chooseThread(Object orderingKey) {
// skip hashcode generation in this special case
if (threads.length == 1) {
return threads[0];
@@ -295,7 +357,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
* @param orderingKey long ordering key
* @return the thread for executing this order key
*/
- public ListeningScheduledExecutorService chooseThread(long orderingKey) {
+ public ExecutorService chooseThread(long orderingKey) {
if (threads.length == 1) {
return threads[0];
}
@@ -303,7 +365,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
}
- private SafeRunnable timedRunnable(SafeRunnable r) {
+ private Runnable timedRunnable(Runnable r) {
if (traceTaskExecution) {
return new TimedRunnable(r);
} else {
@@ -312,174 +374,75 @@ public class OrderedScheduler implements ScheduledExecutorService {
}
/**
- * schedules a one time action to execute.
- */
- public void submit(SafeRunnable r) {
- chooseThread().submit(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
- *
- * @param orderingKey order key to submit the task
- * @param r task to run
- * @return listenable future on the completion of the task
- */
- public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
- return chooseThread(orderingKey).submit(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
- *
- * @param orderingKey order key to submit the task
- * @param r task to run
- */
- public void executeOrdered(Object orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the key.
- *
- * @param orderingKey
- * @param r
- */
- public void submitOrdered(long orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the key.
- *
- * @param orderingKey
- * @param r
+ * {@inheritDoc}
*/
- public void submitOrdered(int orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(timedRunnable(r));
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return chooseThread().submit(task);
}
/**
- * schedules a one time action to execute with an ordering guarantee on the key.
- *
- * @param orderingKey
- * @param callable
+ * {@inheritDoc}
*/
- public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
- Callable<T> callable) {
- return chooseThread(orderingKey).submit(callable);
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return chooseThread().submit(timedRunnable(task), result);
}
/**
- * Creates and executes a one-shot action that becomes enabled after the given delay.
- *
- * @param command - the SafeRunnable to execute
- * @param delay - the time from now to delay execution
- * @param unit - the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of the task and whose get() method
- * will return null upon completion
+ * {@inheritDoc}
*/
- public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
- return chooseThread().schedule(command, delay, unit);
+ @Override
+ public Future<?> submit(Runnable task) {
+ return chooseThread().submit(timedRunnable(task));
}
/**
- * Creates and executes a one-shot action that becomes enabled after the given delay.
- *
- * @param orderingKey - the key used for ordering
- * @param command - the SafeRunnable to execute
- * @param delay - the time from now to delay execution
- * @param unit - the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of the task and whose get() method
- * will return null upon completion
+ * {@inheritDoc}
*/
- public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
- return chooseThread(orderingKey).schedule(command, delay, unit);
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return chooseThread().invokeAll(tasks);
}
/**
- * Creates and executes a periodic action that becomes enabled first after
- * the given initial delay, and subsequently with the given period.
- *
- * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
- *
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param period - the period between successive executions
- * @param unit - the time unit of the initialDelay and period parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get()
- * method will throw an exception upon cancellation
+ * {@inheritDoc}
*/
- public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
- return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit)
+ throws InterruptedException {
+ return chooseThread().invokeAll(tasks, timeout, unit);
}
/**
- * Creates and executes a periodic action that becomes enabled first after
- * the given initial delay, and subsequently with the given period.
- *
- * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
- *
- * @param orderingKey - the key used for ordering
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param period - the period between successive executions
- * @param unit - the time unit of the initialDelay and period parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get() method
- * will throw an exception upon cancellation
+ * {@inheritDoc}
*/
- public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
- long period, TimeUnit unit) {
- return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return chooseThread().invokeAny(tasks);
}
/**
- * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
- * with the given delay between the termination of one execution and the commencement of the next.
- *
- * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
- * .
- *
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param delay - the delay between the termination of one execution and the commencement of the next
- * @param unit - the time unit of the initialDelay and delay parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get() method
- * will throw an exception upon cancellation
+ * {@inheritDoc}
*/
- public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
- TimeUnit unit) {
- return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return chooseThread().invokeAny(tasks, timeout, unit);
}
/**
- * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
- * with the given delay between the termination of one execution and the commencement of the next.
- *
- * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
- * .
- *
- * @param orderingKey - the key used for ordering
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param delay - the delay between the termination of one execution and the commencement of the next
- * @param unit - the time unit of the initialDelay and delay parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get() method
- * will throw an exception upon cancellation
+ * {@inheritDoc}
*/
- public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
- long delay, TimeUnit unit) {
- return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ @Override
+ public void execute(Runnable command) {
+ chooseThread().execute(command);
}
- protected long getThreadID(long orderingKey) {
- // skip hashcode generation in this special case
- if (threadIds.length == 1) {
- return threadIds[0];
- }
-
- return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
- }
/**
* {@inheritDoc}
@@ -497,7 +460,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
@Override
public List<Runnable> shutdownNow() {
List<Runnable> runnables = new ArrayList<Runnable>();
- for (ScheduledExecutorService executor : threads) {
+ for (ExecutorService executor : threads) {
runnables.addAll(executor.shutdownNow());
}
return runnables;
@@ -508,7 +471,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
*/
@Override
public boolean isShutdown() {
- for (ScheduledExecutorService executor : threads) {
+ for (ExecutorService executor : threads) {
if (!executor.isShutdown()) {
return false;
}
@@ -533,7 +496,7 @@ public class OrderedScheduler implements ScheduledExecutorService {
*/
@Override
public boolean isTerminated() {
- for (ScheduledExecutorService executor : threads) {
+ for (ExecutorService executor : threads) {
if (!executor.isTerminated()) {
return false;
}
@@ -558,112 +521,4 @@ public class OrderedScheduler implements ScheduledExecutorService {
}
}
- //
- // Methods for implementing {@link ScheduledExecutorService}
- //
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- return chooseThread().schedule(command, delay, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- return chooseThread().schedule(callable, delay, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay, long period, TimeUnit unit) {
- return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
- long initialDelay, long delay, TimeUnit unit) {
- return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return chooseThread().submit(task);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return chooseThread().submit(task, result);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Future<?> submit(Runnable task) {
- return chooseThread().submit(task);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- return chooseThread().invokeAll(tasks);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout,
- TimeUnit unit)
- throws InterruptedException {
- return chooseThread().invokeAll(tasks, timeout, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- return chooseThread().invokeAny(tasks);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return chooseThread().invokeAny(tasks, timeout, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void execute(Runnable command) {
- chooseThread().execute(command);
- }
-
}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index bf2a6fb..6f05832 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -17,32 +17,20 @@
*/
package org.apache.bookkeeper.common.util;
-import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
/**
* This class provides 2 things over the java {@link ScheduledExecutorService}.
@@ -58,19 +46,7 @@ import org.apache.commons.lang.StringUtils;
* achieved by hashing the key objects to threads by their {@link #hashCode()}
* method.
*/
-public class OrderedScheduler implements ScheduledExecutorService {
- public static final int NO_TASK_LIMIT = -1;
- protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
-
- final String name;
- final ListeningScheduledExecutorService threads[];
- final long threadIds[];
- final Random rand = new Random();
- final OpStatsLogger taskExecutionStats;
- final OpStatsLogger taskPendingStats;
- final boolean traceTaskExecution;
- final long warnTimeMicroSec;
- final int maxTasksInQueue;
+public class OrderedScheduler extends OrderedExecutor implements ScheduledExecutorService {
/**
* Create a builder to build ordered scheduler.
@@ -84,61 +60,13 @@ public class OrderedScheduler implements ScheduledExecutorService {
/**
* Builder to build ordered scheduler.
*/
- public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
-
- /**
- * Abstract builder class to build {@link OrderedScheduler}.
- */
- public abstract static class AbstractBuilder<T extends OrderedScheduler> {
- protected String name = getClass().getSimpleName();
- protected int numThreads = Runtime.getRuntime().availableProcessors();
- protected ThreadFactory threadFactory = null;
- protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
- protected boolean traceTaskExecution = false;
- protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
- protected int maxTasksInQueue = NO_TASK_LIMIT;
-
- public AbstractBuilder<T> name(String name) {
- this.name = name;
- return this;
- }
-
- public AbstractBuilder<T> numThreads(int num) {
- this.numThreads = num;
- return this;
- }
-
- public AbstractBuilder<T> maxTasksInQueue(int num) {
- this.maxTasksInQueue = num;
- return this;
- }
-
- public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- return this;
- }
-
- public AbstractBuilder<T> statsLogger(StatsLogger statsLogger) {
- this.statsLogger = statsLogger;
- return this;
- }
-
- public AbstractBuilder<T> traceTaskExecution(boolean enabled) {
- this.traceTaskExecution = enabled;
- return this;
- }
-
- public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
- this.warnTimeMicroSec = warnTimeMicroSec;
- return this;
- }
-
- @SuppressWarnings("unchecked")
- public T build() {
+ public static class SchedulerBuilder extends OrderedExecutor.AbstractBuilder<OrderedScheduler> {
+ @Override
+ public OrderedScheduler build() {
if (null == threadFactory) {
threadFactory = new DefaultThreadFactory(name);
}
- return (T) new OrderedScheduler(
+ return new OrderedScheduler(
name,
numThreads,
threadFactory,
@@ -147,32 +75,6 @@ public class OrderedScheduler implements ScheduledExecutorService {
warnTimeMicroSec,
maxTasksInQueue);
}
-
- }
-
- private class TimedRunnable implements SafeRunnable {
- final SafeRunnable runnable;
- final long initNanos;
-
- TimedRunnable(SafeRunnable runnable) {
- this.runnable = runnable;
- this.initNanos = MathUtils.nowInNano();
- }
-
- @Override
- public void safeRun() {
- taskPendingStats.registerSuccessfulEvent(
- MathUtils.elapsedNanos(initNanos),
- TimeUnit.NANOSECONDS);
- long startNanos = MathUtils.nowInNano();
- this.runnable.safeRun();
- long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
- taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
- if (elapsedMicroSec >= warnTimeMicroSec) {
- LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
- runnable, runnable.getClass(), elapsedMicroSec);
- }
- }
}
/**
@@ -191,172 +93,40 @@ public class OrderedScheduler implements ScheduledExecutorService {
* @param warnTimeMicroSec
* - log long task exec warning after this interval
*/
- protected OrderedScheduler(String baseName,
+ private OrderedScheduler(String baseName,
int numThreads,
ThreadFactory threadFactory,
StatsLogger statsLogger,
boolean traceTaskExecution,
long warnTimeMicroSec,
int maxTasksInQueue) {
- checkArgument(numThreads > 0);
- checkArgument(!StringUtils.isBlank(baseName));
-
- this.maxTasksInQueue = maxTasksInQueue;
- this.warnTimeMicroSec = warnTimeMicroSec;
- name = baseName;
- threads = new ListeningScheduledExecutorService[numThreads];
- threadIds = new long[numThreads];
- for (int i = 0; i < numThreads; i++) {
- final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
- .setThreadFactory(threadFactory)
- .build());
- threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue);
-
- final int idx = i;
- try {
- threads[idx].submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- threadIds[idx] = Thread.currentThread().getId();
- }
- }).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Couldn't start thread " + i, e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Couldn't start thread " + i, e);
- }
-
- // Register gauges
- statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getQueue().size();
- }
- });
- statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
+ super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+ }
- @Override
- public Number getSample() {
- return thread.getCompletedTaskCount();
- }
- });
- statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
- @Override
- public Number getSample() {
- return thread.getTaskCount();
- }
- });
- }
+ @Override
+ protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
+ return new ScheduledThreadPoolExecutor(1, factory);
+ }
- // Stats
- this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution");
- this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued");
- this.traceTaskExecution = traceTaskExecution;
+ @Override
+ protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
+ return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue);
}
+ @Override
public ListeningScheduledExecutorService chooseThread() {
- // skip random # generation in this special case
- if (threads.length == 1) {
- return threads[0];
- }
-
- return threads[rand.nextInt(threads.length)];
+ return (ListeningScheduledExecutorService) super.chooseThread();
}
+ @Override
public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
- // skip hashcode generation in this special case
- if (threads.length == 1) {
- return threads[0];
- }
-
- return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
+ return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
}
- /**
- * skip hashcode generation in this special case.
- *
- * @param orderingKey long ordering key
- * @return the thread for executing this order key
- */
+ @Override
public ListeningScheduledExecutorService chooseThread(long orderingKey) {
- if (threads.length == 1) {
- return threads[0];
- }
-
- return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
- }
-
- private SafeRunnable timedRunnable(SafeRunnable r) {
- if (traceTaskExecution) {
- return new TimedRunnable(r);
- } else {
- return r;
- }
- }
-
- /**
- * schedules a one time action to execute.
- */
- public void submit(SafeRunnable r) {
- chooseThread().submit(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
- *
- * @param orderingKey order key to submit the task
- * @param r task to run
- * @return listenable future on the completion of the task
- */
- public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
- return chooseThread(orderingKey).submit(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the <tt>orderingKey</tt>.
- *
- * @param orderingKey order key to submit the task
- * @param r task to run
- */
- public void executeOrdered(Object orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the key.
- *
- * @param orderingKey
- * @param r
- */
- public void submitOrdered(long orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(timedRunnable(r));
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the key.
- *
- * @param orderingKey
- * @param r
- */
- public void submitOrdered(int orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(timedRunnable(r));
+ return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
}
/**
@@ -472,91 +242,6 @@ public class OrderedScheduler implements ScheduledExecutorService {
return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
- protected long getThreadID(long orderingKey) {
- // skip hashcode generation in this special case
- if (threadIds.length == 1) {
- return threadIds[0];
- }
-
- return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void shutdown() {
- for (int i = 0; i < threads.length; i++) {
- threads[i].shutdown();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Runnable> shutdownNow() {
- List<Runnable> runnables = new ArrayList<Runnable>();
- for (ScheduledExecutorService executor : threads) {
- runnables.addAll(executor.shutdownNow());
- }
- return runnables;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isShutdown() {
- for (ScheduledExecutorService executor : threads) {
- if (!executor.isShutdown()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- boolean ret = true;
- for (int i = 0; i < threads.length; i++) {
- ret = ret && threads[i].awaitTermination(timeout, unit);
- }
- return ret;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isTerminated() {
- for (ScheduledExecutorService executor : threads) {
- if (!executor.isTerminated()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Force threads shutdown (cancel active requests) after specified delay,
- * to be used after shutdown() rejects new requests.
- */
- public void forceShutdown(long timeout, TimeUnit unit) {
- for (int i = 0; i < threads.length; i++) {
- try {
- if (!threads[i].awaitTermination(timeout, unit)) {
- threads[i].shutdownNow();
- }
- } catch (InterruptedException exception) {
- threads[i].shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
- }
//
// Methods for implementing {@link ScheduledExecutorService}
@@ -596,74 +281,4 @@ public class OrderedScheduler implements ScheduledExecutorService {
return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return chooseThread().submit(task);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return chooseThread().submit(task, result);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Future<?> submit(Runnable task) {
- return chooseThread().submit(task);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- return chooseThread().invokeAll(tasks);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout,
- TimeUnit unit)
- throws InterruptedException {
- return chooseThread().invokeAll(tasks, timeout, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- return chooseThread().invokeAny(tasks);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return chooseThread().invokeAny(tasks, timeout, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void execute(Runnable command) {
- chooseThread().execute(command);
- }
-
}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
index 89214e3..6a52ef5 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
@@ -94,7 +94,7 @@ public final class Retries {
scheduler,
null);
} else {
- scheduler.submitOrdered(key, () -> {
+ scheduler.executeOrdered(key, () -> {
execute(
future,
backoffs.iterator(),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 546a8ba..d5f6c8d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.UpdateLedgerOp;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
@@ -112,7 +113,6 @@ import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.Tool;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
@@ -814,7 +814,7 @@ public class BookieShell implements Tool {
} else {
// Use BookieClient to target a specific bookie
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+ OrderedExecutor executor = OrderedExecutor.newBuilder()
.numThreads(1)
.name("BookieClientScheduler")
.build();
@@ -1022,6 +1022,7 @@ public class BookieShell implements Tool {
return ledgerId;
}
+ @Override
public void operationComplete(int rc, LedgerMetadata result) {
if (rc != 0) {
setException(BKException.create(rc));
@@ -2489,6 +2490,7 @@ public class BookieShell implements Tool {
return "Convert bookie indexes from InterleavedStorage to DbLedgerStorage format";
}
+ @Override
String getUsage() {
return CMD_CONVERT_TO_DB_STORAGE;
}
@@ -2584,6 +2586,7 @@ public class BookieShell implements Tool {
return "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format";
}
+ @Override
String getUsage() {
return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
}
@@ -2699,6 +2702,7 @@ public class BookieShell implements Tool {
return "Rebuild DbLedgerStorage locations index by scanning the entry logs";
}
+ @Override
String getUsage() {
return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
}
@@ -3116,6 +3120,7 @@ public class BookieShell implements Tool {
};
return new Iterable<SortedMap<Long, Long>>() {
+ @Override
public Iterator<SortedMap<Long, Long>> iterator() {
return iterator;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index f0c5249..9862665 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -57,6 +57,8 @@ import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.Feature;
@@ -79,7 +81,6 @@ import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.configuration.ConfigurationException;
@@ -134,8 +135,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
final BookieClient bookieClient;
final BookieWatcher bookieWatcher;
- final OrderedSafeExecutor mainWorkerPool;
- final ScheduledExecutorService scheduler;
+ final OrderedExecutor mainWorkerPool;
+ final OrderedScheduler scheduler;
final HashedWheelTimer requestTimer;
final boolean ownTimer;
final FeatureProvider featureProvider;
@@ -421,9 +422,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
// initialize resources
- this.scheduler = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("BookKeeperClientScheduler"));
- this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
+ this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
+ this.mainWorkerPool = OrderedExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
.statsLogger(statsLogger)
@@ -672,12 +672,12 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
}
@VisibleForTesting
- OrderedSafeExecutor getMainWorkerPool() {
+ OrderedExecutor getMainWorkerPool() {
return mainWorkerPool;
}
@VisibleForTesting
- ScheduledExecutorService getScheduler() {
+ OrderedScheduler getScheduler() {
return scheduler;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index c77843a..96c8998 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -116,7 +116,7 @@ interface ExplicitLacFlushPolicy {
}
};
try {
- scheduledFuture = lh.bk.getMainWorkerPool().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+ scheduledFuture = lh.bk.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException re) {
LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
index 117e359..45ec425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
@@ -30,7 +30,7 @@ import org.apache.bookkeeper.client.SyncCallbackUtils.SyncDeleteCallback;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.bookkeeper.versioning.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
* Encapsulates asynchronous ledger delete operation.
*
*/
-class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
+class LedgerDeleteOp extends OrderedGenericCallback<Void> {
static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 5c2ff8c..3f599e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -46,7 +46,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -416,7 +416,7 @@ public class LedgerFragmentReplicator {
// try again, the previous success (with which this has
// conflicted) will have updated the stat other operations
// such as (addEnsemble) would update it too.
- lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+ lh.rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(
lh.bk.mainWorkerPool, lh.getId()) {
@Override
public void safeOperationComplete(int rc,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 8db5667..a9a85bc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -83,7 +83,7 @@ import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.collections4.IteratorUtils;
import org.slf4j.Logger;
@@ -176,6 +176,7 @@ public class LedgerHandle implements WriteHandle {
this.bookieFailureHistory = CacheBuilder.newBuilder()
.expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
.build(new CacheLoader<BookieSocketAddress, Long>() {
+ @Override
public Long load(BookieSocketAddress key) {
return -1L;
}
@@ -200,10 +201,12 @@ public class LedgerHandle implements WriteHandle {
lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
new Gauge<Integer>() {
- public Integer getDefaultValue() {
+ @Override
+ public Integer getDefaultValue() {
return 0;
}
- public Integer getSample() {
+ @Override
+ public Integer getSample() {
return pendingAddOps.size();
}
});
@@ -240,6 +243,7 @@ public class LedgerHandle implements WriteHandle {
*
* @return the id of the ledger
*/
+ @Override
public long getId() {
return ledgerId;
}
@@ -344,6 +348,7 @@ public class LedgerHandle implements WriteHandle {
*
* @return the length of the ledger in bytes
*/
+ @Override
public synchronized long getLength() {
return this.length;
}
@@ -453,7 +458,7 @@ public class LedgerHandle implements WriteHandle {
* @param rc
*/
void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
- bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
+ bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
final long prevLastEntryId;
@@ -512,7 +517,7 @@ public class LedgerHandle implements WriteHandle {
+ metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength());
}
- final class CloseCb extends OrderedSafeGenericCallback<Void> {
+ final class CloseCb extends OrderedGenericCallback<Void> {
CloseCb() {
super(bk.getMainWorkerPool(), ledgerId);
}
@@ -520,7 +525,7 @@ public class LedgerHandle implements WriteHandle {
@Override
public void safeOperationComplete(final int rc, Void result) {
if (rc == BKException.Code.MetadataVersionException) {
- rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
+ rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
ledgerId) {
@Override
public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
@@ -830,7 +835,7 @@ public class LedgerHandle implements WriteHandle {
boolean isRecoveryRead) {
PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
if (!bk.isClosed()) {
- bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+ bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} else {
op.future().completeExceptionally(BKException.create(ClientClosedException));
}
@@ -1099,7 +1104,7 @@ public class LedgerHandle implements WriteHandle {
}
try {
- bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+ bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
@@ -1689,7 +1694,7 @@ public class LedgerHandle implements WriteHandle {
* reformed ensemble. On MetadataVersionException, will reread latest
* ledgerMetadata and act upon.
*/
- private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> {
+ private final class ChangeEnsembleCb extends OrderedGenericCallback<Void> {
private final EnsembleInfo ensembleInfo;
private final int curBlockAddCompletions;
private final int ensembleChangeIdx;
@@ -1749,7 +1754,7 @@ public class LedgerHandle implements WriteHandle {
* Callback which is reading the ledgerMetadata present in zk. This will try
* to resolve the version conflicts.
*/
- private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> {
+ private final class ReReadLedgerMetadataCb extends OrderedGenericCallback<LedgerMetadata> {
private final int rc;
private final EnsembleInfo ensembleInfo;
private final int curBlockAddCompletions;
@@ -2002,11 +2007,11 @@ public class LedgerHandle implements WriteHandle {
return;
}
- writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
+ writeLedgerConfig(new OrderedGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
@Override
public void safeOperationComplete(final int rc, Void result) {
if (rc == BKException.Code.MetadataVersionException) {
- rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
+ rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
ledgerId) {
@Override
public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index afd56cf..0eaf0b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -238,7 +238,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
}
try {
- bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+ bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 16b577b..33cfaf2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -183,7 +183,7 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
}
if (doRecovery) {
- lh.recover(new OrderedSafeGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
+ lh.recover(new OrderedGenericCallback<Void>(bk.getMainWorkerPool(), ledgerId) {
@Override
public void safeOperationComplete(int rc, Void result) {
if (rc == BKException.Code.OK) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index dd5058e..eb95766 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -143,7 +143,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
void timeoutQuorumWait() {
try {
- lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, new SafeRunnable() {
+ lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
if (completed) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 484e9c7..a41207d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -505,7 +505,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
}
public void submit() {
- lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this);
+ lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
}
void initiate() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 9d62f72..eef0e70 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -163,7 +163,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
}
if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
try {
- bk.getMainWorkerPool().submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
+ bk.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata));
} catch (RejectedExecutionException ree) {
LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
ledgerId, newMetadata);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 285a06a..c160b20 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -63,7 +64,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +77,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
// This is global state that should be across all BookieClients
AtomicLong totalBytesOutstanding = new AtomicLong();
- OrderedSafeExecutor executor;
+ OrderedExecutor executor;
ScheduledExecutorService scheduler;
ScheduledFuture<?> timeoutFuture;
@@ -97,7 +97,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
private final long bookieErrorThresholdPerInterval;
public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
- OrderedSafeExecutor executor, ScheduledExecutorService scheduler,
+ OrderedExecutor executor, ScheduledExecutorService scheduler,
StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.eventLoopGroup = eventLoopGroup;
@@ -198,7 +198,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
- executor.submitOrdered(ledgerId, safeRun(() -> {
+ executor.executeOrdered(ledgerId, safeRun(() -> {
cb.writeLacComplete(rc, ledgerId, addr, ctx);
}));
} catch (RejectedExecutionException re) {
@@ -219,7 +219,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final WriteCallback cb,
final Object ctx) {
try {
- executor.submitOrdered(ledgerId, new SafeRunnable() {
+ executor.executeOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
@@ -266,7 +266,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
final ReadEntryCallback cb,
final Object ctx) {
try {
- executor.submitOrdered(ledgerId, new SafeRunnable() {
+ executor.executeOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
@@ -362,7 +362,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
- executor.submitOrdered(ledgerId, safeRun(() -> {
+ executor.executeOrdered(ledgerId, safeRun(() -> {
cb.readLacComplete(rc, ledgerId, null, null, ctx);
}));
} catch (RejectedExecutionException re) {
@@ -526,7 +526,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
byte hello[] = "hello".getBytes(UTF_8);
long ledger = Long.parseLong(args[2]);
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+ OrderedExecutor executor = OrderedExecutor.newBuilder()
.name("BookieClientWorker")
.numThreads(1)
.build();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 94a43a9..7e65ced 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -59,7 +59,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.stats.Counter;
@@ -68,7 +68,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,12 +92,12 @@ public class BookieRequestProcessor implements RequestProcessor {
/**
* The threadpool used to execute all read entry requests issued to this server.
*/
- private final OrderedSafeExecutor readThreadPool;
+ private final OrderedExecutor readThreadPool;
/**
* The threadpool used to execute all add entry requests issued to this server.
*/
- private final OrderedSafeExecutor writeThreadPool;
+ private final OrderedExecutor writeThreadPool;
/**
* TLS management.
@@ -109,12 +108,12 @@ public class BookieRequestProcessor implements RequestProcessor {
* The threadpool used to execute all long poll requests issued to this server
* after they are done waiting.
*/
- private final OrderedSafeExecutor longPollThreadPool;
+ private final OrderedExecutor longPollThreadPool;
/**
* The threadpool used to execute high priority requests.
*/
- private final OrderedSafeExecutor highPriorityThreadPool;
+ private final OrderedExecutor highPriorityThreadPool;
/**
* The Timer used to time out requests for long polling.
@@ -162,11 +161,11 @@ public class BookieRequestProcessor implements RequestProcessor {
this.longPollThreadPool = createExecutor(
this.serverCfg.getNumLongPollWorkerThreads(),
"BookieLongPollThread-" + serverCfg.getBookiePort(),
- OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+ OrderedExecutor.NO_TASK_LIMIT, statsLogger);
this.highPriorityThreadPool = createExecutor(
this.serverCfg.getNumHighPriorityWorkerThreads(),
"BookieHighPriorityThread-" + serverCfg.getBookiePort(),
- OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+ OrderedExecutor.NO_TASK_LIMIT, statsLogger);
this.requestTimer = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
this.serverCfg.getRequestTimerTickDurationMs(),
@@ -208,7 +207,7 @@ public class BookieRequestProcessor implements RequestProcessor {
shutdownExecutor(highPriorityThreadPool);
}
- private OrderedSafeExecutor createExecutor(
+ private OrderedExecutor createExecutor(
int numThreads,
String nameFormat,
int maxTasksInQueue,
@@ -216,7 +215,7 @@ public class BookieRequestProcessor implements RequestProcessor {
if (numThreads <= 0) {
return null;
} else {
- return OrderedSafeExecutor.newBuilder()
+ return OrderedExecutor.newBuilder()
.numThreads(numThreads)
.name(nameFormat)
.traceTaskExecution(serverCfg.getEnableTaskExecutionStats())
@@ -226,7 +225,7 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
- private void shutdownExecutor(OrderedSafeExecutor service) {
+ private void shutdownExecutor(OrderedExecutor service) {
if (null != service) {
service.shutdown();
}
@@ -310,7 +309,7 @@ public class BookieRequestProcessor implements RequestProcessor {
if (null == writeThreadPool) {
writeLac.run();
} else {
- writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), writeLac);
+ writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), writeLac);
}
}
@@ -319,14 +318,14 @@ public class BookieRequestProcessor implements RequestProcessor {
if (null == readThreadPool) {
readLac.run();
} else {
- readThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), readLac);
+ readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), readLac);
}
}
private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
- final OrderedSafeExecutor threadPool;
+ final OrderedExecutor threadPool;
if (RequestUtils.isHighPriority(r)) {
threadPool = highPriorityThreadPool;
} else {
@@ -337,7 +336,7 @@ public class BookieRequestProcessor implements RequestProcessor {
write.run();
} else {
try {
- threadPool.submitOrdered(r.getAddRequest().getLedgerId(), write);
+ threadPool.executeOrdered(r.getAddRequest().getLedgerId(), write);
} catch (RejectedExecutionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests",
@@ -361,7 +360,7 @@ public class BookieRequestProcessor implements RequestProcessor {
ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
final ReadEntryProcessorV3 read;
- final OrderedSafeExecutor threadPool;
+ final OrderedExecutor threadPool;
if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
ExecutorService lpThread = null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c);
@@ -387,7 +386,7 @@ public class BookieRequestProcessor implements RequestProcessor {
read.run();
} else {
try {
- threadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+ threadPool.executeOrdered(r.getReadRequest().getLedgerId(), read);
} catch (RejectedExecutionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests",
@@ -464,7 +463,7 @@ public class BookieRequestProcessor implements RequestProcessor {
// If it's a high priority add (usually as part of recovery process), we want to make sure it gets
// executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool
- final OrderedSafeExecutor threadPool;
+ final OrderedExecutor threadPool;
if (r.isHighPriority()) {
threadPool = highPriorityThreadPool;
} else {
@@ -475,7 +474,7 @@ public class BookieRequestProcessor implements RequestProcessor {
write.run();
} else {
try {
- threadPool.submitOrdered(r.getLedgerId(), write);
+ threadPool.executeOrdered(r.getLedgerId(), write);
} catch (RejectedExecutionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", r.ledgerId,
@@ -494,7 +493,7 @@ public class BookieRequestProcessor implements RequestProcessor {
// If it's a high priority read (fencing or as part of recovery process), we want to make sure it
// gets executed as fast as possible, so bypass the normal readThreadPool
// and execute in highPriorityThreadPool
- final OrderedSafeExecutor threadPool;
+ final OrderedExecutor threadPool;
if (r.isHighPriority() || r.isFencing()) {
threadPool = highPriorityThreadPool;
} else {
@@ -505,7 +504,7 @@ public class BookieRequestProcessor implements RequestProcessor {
read.run();
} else {
try {
- threadPool.submitOrdered(r.getLedgerId(), read);
+ threadPool.executeOrdered(r.getLedgerId(), read);
} catch (RejectedExecutionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", r.ledgerId,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 439320b..20fcdda 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -120,7 +121,6 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
@@ -149,7 +149,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final BookieSocketAddress addr;
final EventLoopGroup eventLoopGroup;
- final OrderedSafeExecutor executor;
+ final OrderedExecutor executor;
final long addEntryTimeoutNanos;
final long readEntryTimeoutNanos;
final int maxFrameSize;
@@ -213,13 +213,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final ExtensionRegistry extRegistry;
private final SecurityHandlerFactory shFactory;
- public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
+ public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieSocketAddress addr) throws SecurityException {
this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null,
null);
}
- public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
+ public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry) throws SecurityException {
@@ -227,7 +227,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
authProviderFactory, extRegistry, null);
}
- public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
+ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
@@ -236,7 +236,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
authProviderFactory, extRegistry, pcbcPool, null);
}
- public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
+ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
@@ -1111,7 +1111,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
response.release();
} else {
long orderingKey = completionValue.ledgerId;
- executor.submitOrdered(orderingKey,
+ executor.executeOrdered(orderingKey,
ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId,
status, response));
}
@@ -1226,7 +1226,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
} else {
long orderingKey = completionValue.ledgerId;
- executor.submitOrdered(orderingKey, new SafeRunnable() {
+ executor.executeOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
completionValue.handleV3Response(response);
@@ -1400,7 +1400,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
protected void errorOutAndRunCallback(final Runnable callback) {
- executor.submitOrdered(ledgerId,
+ executor.executeOrdered(ledgerId,
new SafeRunnable() {
@Override
public void safeRun() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
new file mode 100644
index 0000000..73150ad
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic callback implementation which will run the
+ * callback in the thread which matches the ordering key.
+ */
+public abstract class OrderedGenericCallback<T> implements GenericCallback<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(OrderedGenericCallback.class);
+
+ private final OrderedExecutor executor;
+ private final long orderingKey;
+
+ /**
+ * @param executor The executor on which to run the callback
+ * @param orderingKey Key used to decide which thread the callback
+ * should run on.
+ */
+ public OrderedGenericCallback(OrderedExecutor executor, long orderingKey) {
+ this.executor = executor;
+ this.orderingKey = orderingKey;
+ }
+
+ @Override
+ public final void operationComplete(final int rc, final T result) {
+ // during closing, callbacks that are error out might try to submit to
+ // the scheduler again. if the submission will go to same thread, we
+ // don't need to submit to executor again. this is also an optimization for
+ // callback submission
+ if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
+ safeOperationComplete(rc, result);
+ } else {
+ try {
+ executor.executeOrdered(orderingKey, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ safeOperationComplete(rc, result);
+ }
+ @Override
+ public String toString() {
+ return String.format("Callback(key=%s, name=%s)",
+ orderingKey,
+ OrderedGenericCallback.this);
+ }
+ });
+ } catch (RejectedExecutionException re) {
+ LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
+ }
+ }
+ }
+
+ public abstract void safeOperationComplete(int rc, T result);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
deleted file mode 100644
index f3af7df..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.util;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides 2 things over the java {@link ScheduledExecutorService}.
- *
- * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
- * This means that exceptions in scheduled tasks wont go unnoticed and will be
- * logged.
- *
- * <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
- * with the same key will always be executed in order, but tasks across
- * different keys can be unordered. This retains parallelism while retaining the
- * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
- * achieved by hashing the key objects to threads by their {@link #hashCode()}
- * method.
- *
- * <p>Note: deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.OrderedScheduler}.
- */
-public class OrderedSafeExecutor extends org.apache.bookkeeper.common.util.OrderedScheduler {
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * A builder class for an OrderedSafeExecutor.
- */
- public static class Builder extends AbstractBuilder<OrderedSafeExecutor> {
-
- public OrderedSafeExecutor build() {
- if (null == threadFactory) {
- threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor");
- }
- return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger,
- traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
- }
- }
-
- /**
- * Constructs Safe executor.
- *
- * @param numThreads
- * - number of threads
- * @param baseName
- * - base name of executor threads
- * @param threadFactory
- * - for constructing threads
- * @param statsLogger
- * - for reporting executor stats
- * @param traceTaskExecution
- * - should we stat task execution
- * @param warnTimeMicroSec
- * - log long task exec warning after this interval
- * @param maxTasksInQueue
- * - maximum items allowed in a thread queue. -1 for no limit
- */
- private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
- StatsLogger statsLogger, boolean traceTaskExecution,
- long warnTimeMicroSec, int maxTasksInQueue) {
- super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
- }
-
- /**
- * Schedules a one time action to execute.
- */
- public void submit(SafeRunnable r) {
- super.submit(r);
- }
-
- /**
- * Schedules a one time action to execute with an ordering guarantee on the key.
- * @param orderingKey
- * @param r
- */
- public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
- return super.submitOrdered(orderingKey, r);
- }
-
- /**
- * Schedules a one time action to execute with an ordering guarantee on the key.
- * @param orderingKey
- * @param r
- */
- public void submitOrdered(long orderingKey, SafeRunnable r) {
- super.submitOrdered(orderingKey, r);
- }
-
- /**
- * Schedules a one time action to execute with an ordering guarantee on the key.
- * @param orderingKey
- * @param r
- */
- public void submitOrdered(int orderingKey, SafeRunnable r) {
- super.submitOrdered(orderingKey, r);
- }
-
- /**
- * Creates and executes a one-shot action that becomes enabled after the given delay.
- *
- * @param command - the SafeRunnable to execute
- * @param delay - the time from now to delay execution
- * @param unit - the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of the task and whose get() method
- * will return null upon completion
- */
- public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
- return super.schedule(command, delay, unit);
- }
-
- /**
- * Creates and executes a one-shot action that becomes enabled after the given delay.
- *
- * @param orderingKey - the key used for ordering
- * @param command - the SafeRunnable to execute
- * @param delay - the time from now to delay execution
- * @param unit - the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of the task and whose get() method
- * will return null upon completion
- */
- public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
- return super.scheduleOrdered(orderingKey, command, delay, unit);
- }
-
- /**
- * Creates and executes a periodic action that becomes enabled first after
- * the given initial delay, and subsequently with the given period.
- *
- * <p>For more details check scheduleAtFixedRate in interface ScheduledExecutorService
- *
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param period - the period between successive executions
- * @param unit - the time unit of the initialDelay and period parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get()
- * method will throw an exception upon cancellation
- */
- public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
- return super.scheduleAtFixedRate(command, initialDelay, period, unit);
- }
-
- /**
- * Creates and executes a periodic action that becomes enabled first after
- * the given initial delay, and subsequently with the given period.
- *
- * <p>For more details check scheduleAtFixedRate in interface ScheduledExecutorService
- *
- * @param orderingKey - the key used for ordering
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param period - the period between successive executions
- * @param unit - the time unit of the initialDelay and period parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get() method
- * will throw an exception upon cancellation
- */
- public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
- long period, TimeUnit unit) {
- return super.scheduleAtFixedRateOrdered(orderingKey, command, initialDelay, period, unit);
- }
-
- /**
- * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
- * with the given delay between the termination of one execution and the commencement of the next.
- *
- * <p>For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
- *
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param delay - the delay between the termination of one execution and the commencement of the next
- * @param unit - the time unit of the initialDelay and delay parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get() method
- * will throw an exception upon cancellation
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
- TimeUnit unit) {
- return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
- }
-
- /**
- * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
- * with the given delay between the termination of one execution and the commencement of the next.
- *
- * <p>For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
- *
- * @param orderingKey - the key used for ordering
- * @param command - the SafeRunnable to execute
- * @param initialDelay - the time to delay first execution
- * @param delay - the delay between the termination of one execution and the commencement of the next
- * @param unit - the time unit of the initialDelay and delay parameters
- * @return a ScheduledFuture representing pending completion of the task, and whose get() method
- * will throw an exception upon cancellation
- */
- public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
- long delay, TimeUnit unit) {
- return super.scheduleWithFixedDelayOrdered(orderingKey, command, initialDelay, delay, unit);
- }
-
- /**
- * Generic callback implementation which will run the
- * callback in the thread which matches the ordering key.
- */
- public abstract static class OrderedSafeGenericCallback<T>
- implements GenericCallback<T> {
- private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
-
- private final OrderedSafeExecutor executor;
- private final long orderingKey;
-
- /**
- * @param executor The executor on which to run the callback
- * @param orderingKey Key used to decide which thread the callback
- * should run on.
- */
- public OrderedSafeGenericCallback(OrderedSafeExecutor executor, long orderingKey) {
- this.executor = executor;
- this.orderingKey = orderingKey;
- }
-
- @Override
- public final void operationComplete(final int rc, final T result) {
- // during closing, callbacks that are error out might try to submit to
- // the scheduler again. if the submission will go to same thread, we
- // don't need to submit to executor again. this is also an optimization for
- // callback submission
- if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
- safeOperationComplete(rc, result);
- } else {
- try {
- executor.submitOrdered(orderingKey, new SafeRunnable() {
- @Override
- public void safeRun() {
- safeOperationComplete(rc, result);
- }
- @Override
- public String toString() {
- return String.format("Callback(key=%s, name=%s)",
- orderingKey,
- OrderedSafeGenericCallback.this);
- }
- });
- } catch (RejectedExecutionException re) {
- LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
- }
- }
- }
-
- public abstract void safeOperationComplete(int rc, T result);
- }
-}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index e686742..5efd7bd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -29,8 +29,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.base.Optional;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -41,15 +43,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
@@ -61,7 +64,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
@@ -76,8 +78,8 @@ public abstract class MockBookKeeperTestCase {
private static final Logger LOG = LoggerFactory.getLogger(MockBookKeeperTestCase.class);
- protected ScheduledExecutorService scheduler;
- protected OrderedSafeExecutor executor;
+ protected OrderedScheduler scheduler;
+ protected OrderedExecutor executor;
protected BookKeeper bk;
protected BookieClient bookieClient;
protected LedgerManager ledgerManager;
@@ -128,8 +130,8 @@ public abstract class MockBookKeeperTestCase {
mockLedgerData = new ConcurrentHashMap<>();
mockNextLedgerId = new AtomicLong(1);
fencedLedgers = new ConcurrentSkipListSet<>();
- scheduler = new ScheduledThreadPoolExecutor(4);
- executor = OrderedSafeExecutor.newBuilder().build();
+ scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(4).name("bk-test").build();
+ executor = OrderedExecutor.newBuilder().build();
bookieWatcher = mock(BookieWatcher.class);
bookieClient = mock(BookieClient.class);
@@ -312,7 +314,7 @@ public abstract class MockBookKeeperTestCase {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Long ledgerId = (Long) args[0];
- executor.submitOrdered(ledgerId, () -> {
+ executor.executeOrdered(ledgerId, () -> {
BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[1];
LedgerMetadata ledgerMetadata = mockLedgerMetadataRegistry.get(ledgerId);
if (ledgerMetadata == null) {
@@ -330,7 +332,7 @@ public abstract class MockBookKeeperTestCase {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Long ledgerId = (Long) args[0];
- executor.submitOrdered(ledgerId, () -> {
+ executor.executeOrdered(ledgerId, () -> {
BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
if (mockLedgerMetadataRegistry.remove(ledgerId) != null) {
cb.operationComplete(BKException.Code.OK, null);
@@ -368,7 +370,7 @@ public abstract class MockBookKeeperTestCase {
Object[] args = invocation.getArguments();
BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
Long ledgerId = (Long) args[0];
- executor.submitOrdered(ledgerId, () -> {
+ executor.executeOrdered(ledgerId, () -> {
LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1];
mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(ledgerMetadata));
cb.operationComplete(BKException.Code.OK, null);
@@ -384,7 +386,7 @@ public abstract class MockBookKeeperTestCase {
Long ledgerId = (Long) args[0];
LedgerMetadata metadata = (LedgerMetadata) args[1];
BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2];
- executor.submitOrdered(ledgerId, () -> {
+ executor.executeOrdered(ledgerId, () -> {
mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(metadata));
cb.operationComplete(BKException.Code.OK, null);
});
@@ -403,7 +405,7 @@ public abstract class MockBookKeeperTestCase {
(BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
boolean fenced = (((Integer) args[5]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING;
- executor.submitOrdered(ledgerId, () -> {
+ executor.executeOrdered(ledgerId, () -> {
DigestManager macManager = null;
try {
macManager = getDigestType(ledgerId);
@@ -478,7 +480,7 @@ public abstract class MockBookKeeperTestCase {
boolean isRecoveryAdd =
((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD;
- executor.submitOrdered(ledgerId, () -> {
+ executor.executeOrdered(ledgerId, () -> {
byte[] entry;
try {
entry = extractEntryPayload(ledgerId, entryId, toSend);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index bf6c230..a91dffa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
@@ -41,7 +42,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCall
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -56,7 +56,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class);
DigestType digestType;
public EventLoopGroup eventLoopGroup;
- public OrderedSafeExecutor executor;
+ public OrderedExecutor executor;
private ScheduledExecutorService scheduler;
public TestGetBookieInfoTimeout() {
@@ -69,7 +69,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
super.setUp();
eventLoopGroup = new NioEventLoopGroup();
- executor = OrderedSafeExecutor.newBuilder()
+ executor = OrderedExecutor.newBuilder()
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 59811d1..a07acef 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.auth.TestAuth;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -49,7 +50,6 @@ import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.Test;
/**
@@ -64,7 +64,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
ClientAuthProvider.Factory authProvider;
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
+ OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
.build();
public TestBackwardCompatCMS42() throws Exception {
@@ -191,7 +191,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
final CountDownLatch connected = new CountDownLatch(1);
CompatClient42(ClientConfiguration conf,
- OrderedSafeExecutor executor,
+ OrderedExecutor executor,
EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
ClientAuthProvider.Factory authProviderFactory,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 90788b1..d96dce1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -46,7 +47,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.junit.Test;
import org.slf4j.Logger;
@@ -79,7 +79,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
@Test
public void testConnectCloseRace() throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = getOrderedSafeExecutor();
+ OrderedExecutor executor = getOrderedSafeExecutor();
BookieSocketAddress addr = getBookie(0);
for (int i = 0; i < 1000; i++) {
@@ -98,8 +98,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
executor.shutdown();
}
- public OrderedSafeExecutor getOrderedSafeExecutor() {
- return OrderedSafeExecutor.newBuilder()
+ public OrderedExecutor getOrderedSafeExecutor() {
+ return OrderedExecutor.newBuilder()
.name("PCBC")
.numThreads(1)
.traceTaskExecution(true)
@@ -122,7 +122,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
}
};
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = getOrderedSafeExecutor();
+ OrderedExecutor executor = getOrderedSafeExecutor();
BookieSocketAddress addr = getBookie(0);
for (int i = 0; i < 100; i++) {
@@ -154,7 +154,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
};
final int iterations = 100000;
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = getOrderedSafeExecutor();
+ OrderedExecutor executor = getOrderedSafeExecutor();
BookieSocketAddress addr = getBookie(0);
final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
@@ -250,7 +250,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
bs.add(startBookie(conf, delayBookie));
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- final OrderedSafeExecutor executor = getOrderedSafeExecutor();
+ final OrderedExecutor executor = getOrderedSafeExecutor();
BookieSocketAddress addr = getBookie(0);
final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
@@ -268,7 +268,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
- executor.submitOrdered(1, new SafeRunnable() {
+ executor.executeOrdered(1, new SafeRunnable() {
@Override
public void safeRun() {
cb.readEntryComplete(rc, 1, 1, null, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7ecce0b..29c2e3c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -54,7 +55,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -68,7 +68,7 @@ public class BookieClientTest {
public int port = 13645;
public EventLoopGroup eventLoopGroup;
- public OrderedSafeExecutor executor;
+ public OrderedExecutor executor;
private ScheduledExecutorService scheduler;
@Before
@@ -85,7 +85,7 @@ public class BookieClientTest {
bs = new BookieServer(conf);
bs.start();
eventLoopGroup = new NioEventLoopGroup();
- executor = OrderedSafeExecutor.newBuilder()
+ executor = OrderedExecutor.newBuilder()
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 1886c2a..c5f7f07 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -123,6 +123,9 @@
<Match>
<Class name="~org\.apache\.bookkeeper\.stats\.generated.*" />
</Match>
+ <Match>
+ <Class name="~org\.apache\.bookkeeper\.common\.generated.*" />
+ </Match>
<!-- modules under stream/ -->
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java
new file mode 100644
index 0000000..66ce59b
--- /dev/null
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Microbenchmarks for different executors providers.
+ */
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Threads(16)
+@Fork(1)
+@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+public class OrderedExecutorBenchmark {
+
+ private static Map<String, Supplier<ExecutorService>> providers = ImmutableMap.of( //
+ "JDK-ThreadPool", () -> Executors.newFixedThreadPool(1),
+ "OrderedExecutor", () -> OrderedExecutor.newBuilder().numThreads(1).build(), //
+ "OrderedScheduler", () -> OrderedScheduler.newSchedulerBuilder().numThreads(1).build());
+
+ @State(Scope.Benchmark)
+ public static class TestState {
+ @Param({ "JDK-ThreadPool", "OrderedExecutor", "OrderedScheduler" })
+ private String executorName;
+
+ private ExecutorService executor;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ executor = providers.get(executorName).get();
+ }
+
+ @TearDown(Level.Trial)
+ public void teardown() {
+ executor.shutdown();
+ }
+ }
+
+ @Benchmark
+ public void submitAndWait(TestState s) throws Exception {
+ s.executor.submit(() -> {
+ }).get();
+ }
+}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 6adcbc4..3c8dfb7 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -391,6 +391,7 @@ class BKAsyncLogReader implements AsyncLogReader, SafeRunnable, AsyncNotificatio
return readInternal(1, 0, TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION);
}
+ @Override
public synchronized CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries) {
return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
}
@@ -477,7 +478,7 @@ class BKAsyncLogReader implements AsyncLogReader, SafeRunnable, AsyncNotificatio
long prevCount = scheduleCountUpdater.getAndIncrement(this);
if (0 == prevCount) {
scheduleDelayStopwatch.reset().start();
- scheduler.submitOrdered(streamName, this);
+ scheduler.executeOrdered(streamName, this);
}
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index a0dd6ad..c9bca44 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -394,7 +394,7 @@ class ReadAheadEntryReader implements
}
}
try {
- scheduler.submitOrdered(streamName, runnable);
+ scheduler.executeOrdered(streamName, runnable);
} catch (RejectedExecutionException ree) {
logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
streamName, ree);
@@ -449,7 +449,7 @@ class ReadAheadEntryReader implements
// use runnable here instead of CloseableRunnable,
// because we need this to be executed
try {
- scheduler.submitOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
+ scheduler.executeOrdered(streamName, () -> unsafeAsyncClose(closeFuture));
} catch (RejectedExecutionException ree) {
logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
streamName, ree);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index 5a9ee1a..d78b455 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -212,7 +212,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
if (closed) {
return;
}
- scheduler.submitOrdered(key, r);
+ scheduler.executeOrdered(key, r);
} finally {
closeLock.readLock().unlock();
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 898c113..813e974 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -691,7 +691,7 @@ public class BKLogSegmentEntryReader implements SafeRunnable, LogSegmentEntryRea
long prevCount = scheduleCountUpdater.getAndIncrement(this);
if (0 == prevCount) {
- scheduler.submitOrdered(getSegment().getLogSegmentId(), this);
+ scheduler.executeOrdered(getSegment().getLogSegmentId(), this);
}
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 7948084..107c5e3 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -135,6 +135,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
* Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
* list--is executed synchronously, but the lock wait itself doesn't block.
*/
+ @Override
public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
if (null != lockAcquireFuture) {
return FutureUtils.exception(
@@ -145,7 +146,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
if (null == throwable || !(throwable instanceof CancellationException)) {
return;
}
- lockStateExecutor.submitOrdered(lockPath, () -> asyncClose());
+ lockStateExecutor.executeOrdered(lockPath, () -> asyncClose());
});
final Stopwatch stopwatch = Stopwatch.createStarted();
promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
@@ -165,7 +166,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
}
});
this.lockAcquireFuture = promise;
- lockStateExecutor.submitOrdered(
+ lockStateExecutor.executeOrdered(
lockPath, () -> doAsyncAcquireWithSemaphore(promise, lockTimeout));
return promise;
}
@@ -293,6 +294,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
*
* @throws LockingException if the lock attempt fails
*/
+ @Override
public synchronized void checkOwnershipAndReacquire() throws LockingException {
if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
throw new LockingException(lockPath, "check ownership before acquiring");
@@ -315,6 +317,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
*
* @throws LockingException if the lock attempt fails
*/
+ @Override
public synchronized void checkOwnership() throws LockingException {
if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
throw new LockingException(lockPath, "check ownership before acquiring");
@@ -426,7 +429,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
FutureUtils.complete(closePromise, null);
}
}, lockStateExecutor.chooseThread(lockPath));
- lockStateExecutor.submitOrdered(
+ lockStateExecutor.executeOrdered(
lockPath, () -> closeWaiter(lockWaiter, closeWaiterFuture));
return closePromise;
}
@@ -434,7 +437,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
void internalReacquireLock(final AtomicInteger numRetries,
final long lockTimeout,
final CompletableFuture<ZKDistributedLock> reacquirePromise) {
- lockStateExecutor.submitOrdered(
+ lockStateExecutor.executeOrdered(
lockPath, () -> doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise));
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index 3d79336..84c516c 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -168,6 +168,7 @@ class ZKSessionLock implements SessionLock {
}
static final Comparator<String> MEMBER_COMPARATOR = new Comparator<String>() {
+ @Override
public int compare(String o1, String o2) {
int l1 = parseMemberID(o1);
int l2 = parseMemberID(o2);
@@ -374,6 +375,7 @@ class ZKSessionLock implements SessionLock {
return lockId;
}
+ @Override
public boolean isLockExpired() {
return lockState.isExpiredOrClosing();
}
@@ -392,7 +394,7 @@ class ZKSessionLock implements SessionLock {
* function to execute a lock action
*/
protected void executeLockAction(final int lockEpoch, final LockAction func) {
- lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+ lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
if (getEpoch() == lockEpoch) {
@@ -431,7 +433,7 @@ class ZKSessionLock implements SessionLock {
*/
protected <T> void executeLockAction(final int lockEpoch,
final LockAction func, final CompletableFuture<T> promise) {
- lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+ lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
int currentEpoch = getEpoch();
@@ -554,7 +556,7 @@ class ZKSessionLock implements SessionLock {
@Override
public void processResult(final int rc, String path, Object ctx,
final List<String> children, Stat stat) {
- lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+ lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
if (!lockState.inState(State.INIT)) {
@@ -648,7 +650,7 @@ class ZKSessionLock implements SessionLock {
private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner,
final CompletableFuture<String> result) {
if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) {
- lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+ lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
result.complete(currentOwner.getLeft());
@@ -878,7 +880,7 @@ class ZKSessionLock implements SessionLock {
// Use lock executor here rather than lock action, because we want this opertaion to be applied
// whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no
// risk of an ABA problem where we delete and recreate a node and then delete it again here.
- lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+ lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
acquireFuture.completeExceptionally(cause);
@@ -980,7 +982,7 @@ class ZKSessionLock implements SessionLock {
zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(final int rc, final String path, Object ctx) {
- lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() {
+ lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
if (KeeperException.Code.OK.intValue() == rc) {
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index 517b0df..437bb6f 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -65,6 +65,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
private BookKeeperClient bkc;
private ZooKeeperClient zkc;
+ @Override
@Before
public void setup() throws Exception {
super.setup();
@@ -94,6 +95,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
.build();
}
+ @Override
@After
public void teardown() throws Exception {
if (null != bkc) {
@@ -138,15 +140,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
final CompletableFuture<Void> promise = new CompletableFuture<Void>();
- scheduler.submitOrdered(streamName, () -> {
+ scheduler.executeOrdered(streamName, () -> {
FutureUtils.complete(promise, null);
- // the following line is needed for oraclejdk9 to avoid following exception
- // ```
- // incompatible types: inference variable T has incompatible bounds
- // upper bounds: java.lang.Object
- // lower bounds: void
- // ```
- return (Void) null;
});
Utils.ioResult(promise);
}
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index 901d2a0..8a9d9c2 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -779,7 +779,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
// expire session
ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
// submit a runnable to lock state executor to ensure any state changes happened when session expired
- lockStateExecutor.submitOrdered(lockPath, () -> expiredLatch.countDown());
+ lockStateExecutor.executeOrdered(lockPath, () -> expiredLatch.countDown());
expiredLatch.await();
// no watcher was registered if never acquired lock successfully
assertEquals(State.INIT, lock.getLockState());
@@ -1216,7 +1216,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
} else {
ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
final CountDownLatch latch = new CountDownLatch(1);
- lockStateExecutor.submitOrdered(lockPath, () -> latch.countDown());
+ lockStateExecutor.executeOrdered(lockPath, () -> latch.countDown());
latch.await();
children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
index 6155cba..39e00c0 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
@@ -79,7 +79,7 @@ public final class FailRequestStorageContainer implements StorageContainer {
private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
CompletableFuture<T> future = FutureUtils.createFuture();
- scheduler.submitOrdered(scId, () -> {
+ scheduler.executeOrdered(scId, () -> {
future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
});
return future;
diff --git a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
index e462d33..eba29f1 100644
--- a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
+++ b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java
@@ -50,7 +50,7 @@ public class BookKeeperServerShadedJarTest {
@Test
public void testBookKeeperCommon() throws Exception {
- Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor");
+ Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor");
assertTrue(true);
}
diff --git a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
index 45be333..c4519df 100644
--- a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
+++ b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
@@ -85,13 +85,13 @@ public class DistributedLogCoreShadedJarTest {
@Test(expected = ClassNotFoundException.class)
public void testBookKeeperCommon() throws Exception {
- Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor");
+ Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor");
assertTrue(true);
}
@Test
public void testBookKeeperCommonShade() throws Exception {
- Class.forName("dlshade.org.apache.bookkeeper.util.OrderedSafeExecutor");
+ Class.forName("dlshade.org.apache.bookkeeper.common.util.OrderedExecutor");
assertTrue(true);
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.