You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/10/03 10:46:20 UTC
[bookkeeper] branch master updated: ISSUE #527: Introduce backoff
and retry utilities
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8399707 ISSUE #527: Introduce backoff and retry utilities
8399707 is described below
commit 8399707f054e5d387e1af18384efab4cd1b26e57
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Tue Oct 3 12:46:14 2017 +0200
ISSUE #527: Introduce backoff and retry utilities
Descriptions of the changes in this PR:
Mainly want to Introduce backoff and retires utilities:
Backoff implements various backoff strategiesis, and id intended to determine the duration after which a task is to be retried.
Retries is a util class for supporting retries with customized backoff.
Author: Jia Zhai <zh...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #528 from zhaijack/retries_backoff, closes #527
---
bookkeeper-common/pom.xml | 11 +
.../common/concurrent/FutureEventListener.java | 44 +++
.../bookkeeper/common/concurrent/FutureUtils.java | 368 ++++++++++++++++++++
.../bookkeeper/common/concurrent/package-info.java | 22 ++
.../bookkeeper/common/stats/OpStatsListener.java | 55 +++
.../bookkeeper/common/stats/package-info.java | 21 ++
.../org/apache/bookkeeper/common/util/Backoff.java | 288 +++++++++++++++
.../apache/bookkeeper/common}/util/MathUtils.java | 23 +-
.../bookkeeper/common/util/OrderedScheduler.java | 193 +++++-----
.../org/apache/bookkeeper/common/util/Retries.java | 142 ++++++++
.../bookkeeper/common}/util/SafeRunnable.java | 56 +--
.../apache/bookkeeper/common/util/StreamUtil.java | 70 ++++
.../common/concurrent/TestFutureUtils.java | 387 +++++++++++++++++++++
.../apache/bookkeeper/common/util/TestBackoff.java | 191 ++++++++++
.../bookkeeper/common/util/TestMathUtils.java | 61 ++++
bookkeeper-server/pom.xml | 15 +-
.../main/java/org/apache/bookkeeper/util/Main.java | 54 ---
.../java/org/apache/bookkeeper/util/MathUtils.java | 84 +----
.../bookkeeper/util/OrderedSafeExecutor.java | 271 +--------------
.../org/apache/bookkeeper/util/SafeRunnable.java | 23 +-
.../main/resources/bookkeeper/findbugsExclude.xml | 21 ++
pom.xml | 2 +
22 files changed, 1829 insertions(+), 573 deletions(-)
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 4961e4a..b369630 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -36,6 +36,17 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${google.code.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureEventListener.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureEventListener.java
new file mode 100644
index 0000000..6b71800
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureEventListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.concurrent;
+
+import java.util.concurrent.CompletionException;
+import java.util.function.BiConsumer;
+
+/**
+ * Provide similar interface (as twitter future) over java future.
+ */
+public interface FutureEventListener<T> extends BiConsumer<T, Throwable> {
+
+ void onSuccess(T value);
+
+ void onFailure(Throwable cause);
+
+ @Override
+ default void accept(T t, Throwable throwable) {
+ if (null != throwable) {
+ if (throwable instanceof CompletionException && null != throwable.getCause()) {
+ onFailure(throwable.getCause());
+ } else {
+ onFailure(throwable);
+ }
+ return;
+ }
+ onSuccess(t);
+ }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java
new file mode 100644
index 0000000..ab2d1ca
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.concurrent;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.stats.OpStatsListener;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * Future related utils.
+ */
+@Slf4j
+public final class FutureUtils {
+
+ private FutureUtils() {}
+
+ private static final Function<Throwable, Exception> DEFAULT_EXCEPTION_HANDLER = cause -> {
+ if (cause instanceof Exception) {
+ return (Exception) cause;
+ } else {
+ return new Exception(cause);
+ }
+ };
+
+ public static CompletableFuture<Void> Void() {
+ return value(null);
+ }
+
+ public static <T> T result(CompletableFuture<T> future) throws Exception {
+ return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER);
+ }
+
+ public static <T> T result(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) throws Exception {
+ return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER, timeout, timeUnit);
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ public static <T, ExceptionT extends Throwable> T result(
+ CompletableFuture<T> future, Function<Throwable, ExceptionT> exceptionHandler) throws ExceptionT {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (ExecutionException e) {
+ ExceptionT cause = exceptionHandler.apply(e.getCause());
+ if (null == cause) {
+ return null;
+ } else {
+ throw cause;
+ }
+ }
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ public static <T, ExceptionT extends Throwable> T result(
+ CompletableFuture<T> future,
+ Function<Throwable, ExceptionT> exceptionHandler,
+ long timeout,
+ TimeUnit timeUnit) throws ExceptionT, TimeoutException {
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (ExecutionException e) {
+ ExceptionT cause = exceptionHandler.apply(e.getCause());
+ if (null == cause) {
+ return null;
+ } else {
+ throw cause;
+ }
+ }
+ }
+
+ public static <T> CompletableFuture<T> createFuture() {
+ return new CompletableFuture<T>();
+ }
+
+ public static <T> CompletableFuture<T> value(T value) {
+ return CompletableFuture.completedFuture(value);
+ }
+
+ public static <T> CompletableFuture<T> exception(Throwable cause) {
+ CompletableFuture<T> future = FutureUtils.createFuture();
+ future.completeExceptionally(cause);
+ return future;
+ }
+
+ public static <T> void complete(CompletableFuture<T> result,
+ T value) {
+ if (null == result) {
+ return;
+ }
+ result.complete(value);
+ }
+
+ public static <T> void completeExceptionally(CompletableFuture<T> result,
+ Throwable cause) {
+ if (null == result) {
+ return;
+ }
+ result.completeExceptionally(cause);
+ }
+
+ /**
+ * Completing the {@code future} in the thread in the scheduler identified by
+ * the {@code scheduleKey}.
+ *
+ * @param future future to complete
+ * @param action action to execute when complete
+ * @param scheduler scheduler to execute the action.
+ * @param scheduleKey key to choose the thread to execute the action
+ * @param <T>
+ * @return
+ */
+ public static <T> CompletableFuture<T> whenCompleteAsync(
+ CompletableFuture<T> future,
+ BiConsumer<? super T, ? super Throwable> action,
+ OrderedScheduler scheduler,
+ Object scheduleKey) {
+ return future.whenCompleteAsync(action, scheduler.chooseThread(scheduleKey));
+ }
+
+ public static <T> CompletableFuture<List<T>> collect(List<CompletableFuture<T>> futureList) {
+ CompletableFuture<Void> finalFuture =
+ CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
+ return finalFuture.thenApply(result ->
+ futureList
+ .stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList()));
+ }
+
+ public static <T> void proxyTo(CompletableFuture<T> src,
+ CompletableFuture<T> target) {
+ src.whenComplete((value, cause) -> {
+ if (null == cause) {
+ target.complete(value);
+ } else {
+ target.completeExceptionally(cause);
+ }
+ });
+ }
+
+ //
+ // Process futures
+ //
+
+ private static class ListFutureProcessor<T, R>
+ implements FutureEventListener<R>, Runnable {
+
+ private volatile boolean done = false;
+ private final Iterator<T> itemsIter;
+ private final Function<T, CompletableFuture<R>> processFunc;
+ private final CompletableFuture<List<R>> promise;
+ private final List<R> results;
+ private final ExecutorService callbackExecutor;
+
+ ListFutureProcessor(List<T> items,
+ Function<T, CompletableFuture<R>> processFunc,
+ ExecutorService callbackExecutor) {
+ this.itemsIter = items.iterator();
+ this.processFunc = processFunc;
+ this.promise = new CompletableFuture<>();
+ this.results = Lists.newArrayListWithExpectedSize(items.size());
+ this.callbackExecutor = callbackExecutor;
+ }
+
+ @Override
+ public void onSuccess(R value) {
+ results.add(value);
+ if (null == callbackExecutor) {
+ run();
+ } else {
+ callbackExecutor.submit(this);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ done = true;
+
+ if (null == callbackExecutor) {
+ promise.completeExceptionally(cause);
+ } else {
+ callbackExecutor.submit((Runnable) () -> promise.completeExceptionally(cause));
+ }
+ }
+
+ @Override
+ public void run() {
+ if (done) {
+ log.debug("ListFutureProcessor is interrupted.");
+ return;
+ }
+ if (!itemsIter.hasNext()) {
+ promise.complete(results);
+ done = true;
+ return;
+ }
+ processFunc.apply(itemsIter.next()).whenComplete(this);
+ }
+ }
+
+ /**
+ * Process the list of items one by one using the process function <i>processFunc</i>.
+ * The process will be stopped immediately if it fails on processing any one.
+ *
+ * @param collection list of items
+ * @param processFunc process function
+ * @param callbackExecutor executor to process the item
+ * @return future presents the list of processed results
+ */
+ public static <T, R> CompletableFuture<List<R>> processList(List<T> collection,
+ Function<T, CompletableFuture<R>> processFunc,
+ @Nullable ExecutorService callbackExecutor) {
+ ListFutureProcessor<T, R> processor =
+ new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
+ if (null != callbackExecutor) {
+ callbackExecutor.submit(processor);
+ } else {
+ processor.run();
+ }
+ return processor.promise;
+ }
+
+ /**
+ * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
+ * If the promise has been satisfied before raising, it won't change the state of the promise.
+ *
+ * @param promise promise to raise exception
+ * @param timeout timeout period
+ * @param unit timeout period unit
+ * @param cause cause to raise
+ * @param scheduler scheduler to execute raising exception
+ * @param key the submit key used by the scheduler
+ * @return the promise applied with the raise logic
+ */
+ public static <T> CompletableFuture<T> within(final CompletableFuture<T> promise,
+ final long timeout,
+ final TimeUnit unit,
+ final Throwable cause,
+ final OrderedScheduler scheduler,
+ final Object key) {
+ if (timeout < 0 || promise.isDone()) {
+ return promise;
+ }
+ // schedule a timeout to raise timeout exception
+ final java.util.concurrent.ScheduledFuture<?> task = scheduler.scheduleOrdered(key, () -> {
+ if (!promise.isDone() && promise.completeExceptionally(cause)) {
+ log.info("Raise exception", cause);
+ }
+ }, timeout, unit);
+ // when the promise is satisfied, cancel the timeout task
+ promise.whenComplete((value, throwable) -> {
+ if (!task.cancel(true)) {
+ log.debug("Failed to cancel the timeout task");
+ }
+ }
+ );
+ return promise;
+ }
+
+ /**
+ * Ignore exception from the <i>future</i>.
+ *
+ * @param future the original future
+ * @return a transformed future ignores exceptions
+ */
+ public static <T> CompletableFuture<Void> ignore(CompletableFuture<T> future) {
+ return ignore(future, null);
+ }
+
+ /**
+ * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions.
+ *
+ * @param future the original future
+ * @param errorMsg the error message to log on exceptions
+ * @return a transformed future ignores exceptions
+ */
+ public static <T> CompletableFuture<Void> ignore(CompletableFuture<T> future,
+ final String errorMsg) {
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
+ future.whenComplete(new FutureEventListener<T>() {
+ @Override
+ public void onSuccess(T value) {
+ promise.complete(null);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (null != errorMsg) {
+ log.error(errorMsg, cause);
+ }
+ promise.complete(null);
+ }
+ });
+ return promise;
+ }
+
+ public static <T> CompletableFuture<T> ensure(CompletableFuture<T> future,
+ Runnable ensureBlock) {
+ return future.whenComplete((value, cause) -> {
+ ensureBlock.run();
+ });
+ }
+
+ public static <T> CompletableFuture<T> rescue(CompletableFuture<T> future,
+ Function<Throwable, CompletableFuture<T>> rescueFuc) {
+ CompletableFuture<T> result = FutureUtils.createFuture();
+ future.whenComplete((value, cause) -> {
+ if (null == cause) {
+ result.complete(value);
+ return;
+ }
+ proxyTo(rescueFuc.apply(cause), result);
+ });
+ return result;
+ }
+
+ /**
+ * Add a event listener over <i>result</i> for collecting the operation stats.
+ *
+ * @param result result to listen on
+ * @param opStatsLogger stats logger to record operations stats
+ * @param stopwatch stop watch to time operation
+ * @param <T>
+ * @return result after registered the event listener
+ */
+ public static <T> CompletableFuture<T> stats(CompletableFuture<T> result,
+ OpStatsLogger opStatsLogger,
+ Stopwatch stopwatch) {
+ return result.whenComplete(new OpStatsListener<T>(opStatsLogger, stopwatch));
+ }
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/package-info.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/package-info.java
new file mode 100644
index 0000000..7f4d098
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes commonly useful in concurrent programming.
+ */
+package org.apache.bookkeeper.common.concurrent;
\ No newline at end of file
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/OpStatsListener.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/OpStatsListener.java
new file mode 100644
index 0000000..ca6eb74
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/OpStatsListener.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.stats;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * A {@link FutureEventListener} monitors the stats for a given operation.
+ */
+public class OpStatsListener<T> implements FutureEventListener<T> {
+
+ private final OpStatsLogger opStatsLogger;
+ private final Stopwatch stopwatch;
+
+ public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) {
+ this.opStatsLogger = opStatsLogger;
+ if (null == stopwatch) {
+ this.stopwatch = Stopwatch.createStarted();
+ } else {
+ this.stopwatch = stopwatch;
+ }
+ }
+
+ public OpStatsListener(OpStatsLogger opStatsLogger) {
+ this(opStatsLogger, null);
+ }
+
+ @Override
+ public void onSuccess(T value) {
+ opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+ }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/package-info.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/package-info.java
new file mode 100644
index 0000000..f211381
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stats Related Utils.
+ */
+package org.apache.bookkeeper.common.stats;
\ No newline at end of file
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
new file mode 100644
index 0000000..085e4cd
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import lombok.Data;
+import lombok.ToString;
+
+/**
+ * Implements various backoff strategies.
+ *
+ * <p>Strategies are defined by a {@link java.util.stream.Stream} of durations
+ * and are intended to determine the duration after which a task is to be
+ * retried.
+ */
+public class Backoff {
+
+ private static final int MaxBitShift = 62;
+
+ /**
+ * Back off policy.
+ *
+ * <p>It defines a stream of time durations that will be used for backing off.
+ */
+ public interface Policy {
+
+ Policy NONE = () -> Stream.empty();
+
+ /**
+ * Convert the policy into a series of backoff values.
+ *
+ * @return a series of backoff values.
+ */
+ Stream<Long> toBackoffs();
+
+ }
+
+ /**
+ * A constant backoff policy.
+ */
+ @Data(staticConstructor = "of")
+ @ToString
+ public static class Constant implements Policy {
+
+ /**
+ * Create infinite constant backoff stream.
+ *
+ * <p>It is the infinite version of {@link #of(long, long)}.
+ *
+ * @param ms constant backoff time in milliseconds.
+ * @return constant backoff policy.
+ */
+ public static Constant of(long ms) {
+ return of(ms, -1);
+ }
+
+ private final long ms;
+ private final long limit;
+
+ @Override
+ public Stream<Long> toBackoffs() {
+ if (limit >= 0) {
+ return constant(ms).limit(limit);
+ } else {
+ return constant(ms);
+ }
+ }
+ }
+
+ /**
+ * A Jittered backoff policy.
+ *
+ * <p>It is an implementation of {@link http://www.awsarchitectureblog.com/2015/03/backoff.html}
+ */
+ @Data(staticConstructor = "of")
+ @ToString
+ public static class Jitter implements Policy {
+
+ enum Type {
+ DECORRELATED,
+ EQUAL,
+ EXPONENTIAL
+ }
+
+ /**
+ * Create infinite jittered backoff stream.
+ *
+ * <p>It is the infinite version of {@link #of(Type, long, long, long)}.
+ *
+ * @param type jittered backoff type
+ * @param startMs the start backoff time in milliseconds.
+ * @param maxMs the max backoff time in milliseconds.
+ * @return jittered backoff policy.
+ */
+ public static Jitter of(Type type, long startMs, long maxMs) {
+ return of(type, startMs, maxMs, -1);
+ }
+
+ private final Type type;
+ private final long startMs;
+ private final long maxMs;
+ private final long limit;
+
+ @Override
+ public Stream<Long> toBackoffs() {
+ Stream<Long> backoffStream;
+ switch (type) {
+ case DECORRELATED:
+ backoffStream = decorrelatedJittered(startMs, maxMs);
+ break;
+ case EQUAL:
+ backoffStream = equalJittered(startMs, maxMs);
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffStream = exponentialJittered(startMs, maxMs);
+ break;
+ }
+ if (limit >= 0) {
+ return backoffStream.limit(limit);
+ } else {
+ return backoffStream;
+ }
+ }
+ }
+
+ /**
+ * A exponential backoff policy.
+ */
+ @Data(staticConstructor = "of")
+ @ToString
+ public static class Exponential implements Policy {
+
+ /**
+ * Create an infinite exponential backoff policy.
+ *
+ * <p>It is the infinite version of {@link #of(long, long, int, int)}.
+ *
+ * @param startMs start backoff time in milliseconds.
+ * @param maxMs max backoff time in milliseconds.
+ * @param multiplier the backoff multiplier
+ * @return the exponential backoff policy.
+ */
+ public static Exponential of(long startMs, long maxMs, int multiplier) {
+ return of(startMs, maxMs, multiplier, -1);
+ }
+
+ private final long startMs;
+ private final long maxMs;
+ private final int multiplier;
+ private final int limit;
+
+ @Override
+ public Stream<Long> toBackoffs() {
+ if (limit >= 0) {
+ return exponential(startMs, multiplier, maxMs).limit(limit);
+ } else {
+ return exponential(startMs, multiplier, maxMs);
+ }
+ }
+ }
+
+ /**
+ * Create a stream with constant backoffs.
+ *
+ * @param startMs initial backoff in milliseconds
+ * @return a stream with constant backoff values.
+ */
+ public static Stream<Long> constant(long startMs) {
+ return Stream.iterate(startMs, lastMs -> startMs);
+ }
+
+ /**
+ * Create a stream with exponential backoffs.
+ *
+ * @param startMs initial backoff in milliseconds.
+ * @param multiplier the multiplier for next backoff.
+ * @param maxMs max backoff in milliseconds.
+ * @return a stream with exponential backoffs.
+ */
+ public static Stream<Long> exponential(long startMs,
+ int multiplier,
+ long maxMs) {
+ return Stream.iterate(startMs, lastMs -> Math.min(lastMs * multiplier, maxMs));
+ }
+
+ /**
+ * Create a stream of exponential backoffs with jitters.
+ *
+ * <p>This is "full jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
+ *
+ * @param startMs initial backoff in milliseconds.
+ * @param maxMs max backoff in milliseconds.
+ * @return a stream of exponential backoffs with jitters.
+ */
+ public static Stream<Long> exponentialJittered(long startMs,
+ long maxMs) {
+ final long startNanos = TimeUnit.NANOSECONDS.convert(startMs, TimeUnit.MILLISECONDS);
+ final long maxNanos = TimeUnit.NANOSECONDS.convert(maxMs, TimeUnit.MILLISECONDS);
+ final AtomicLong attempts = new AtomicLong(1);
+ return Stream.iterate(startMs, lastMs -> {
+ long shift = Math.min(attempts.get(), MaxBitShift);
+ long maxBackoffNanos = Math.min(maxNanos, startNanos * (1L << shift));
+ long randomMs = TimeUnit.MILLISECONDS.convert(
+ ThreadLocalRandom.current().nextLong(startNanos, maxBackoffNanos),
+ TimeUnit.NANOSECONDS);
+ attempts.incrementAndGet();
+ return randomMs;
+ });
+ }
+
+ /**
+ * Create an infinite backoffs that have jitter with a random distribution
+ * between {@code startMs} and 3 times the previously selected value, capped at {@code maxMs}.
+ *
+ * <p>this is "decorrelated jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
+ *
+ * @param startMs initial backoff in milliseconds
+ * @param maxMs max backoff in milliseconds
+ * @return a stream of jitter backoffs.
+ */
+ public static Stream<Long> decorrelatedJittered(long startMs,
+ long maxMs) {
+ final long startNanos = TimeUnit.NANOSECONDS.convert(startMs, TimeUnit.MILLISECONDS);
+ final long maxNanos = TimeUnit.NANOSECONDS.convert(maxMs, TimeUnit.MILLISECONDS);
+ return Stream.iterate(startMs, lastMs -> {
+ long lastNanos = TimeUnit.MILLISECONDS.convert(lastMs, TimeUnit.NANOSECONDS);
+ long randRange = Math.abs(lastNanos * 3 - startNanos);
+ long randBackoff;
+ if (0L == randRange) {
+ randBackoff = startNanos;
+ } else {
+ randBackoff = startNanos + ThreadLocalRandom.current().nextLong(randRange);
+ }
+ long backOffNanos = Math.min(maxNanos, randBackoff);
+ return TimeUnit.MILLISECONDS.convert(backOffNanos, TimeUnit.NANOSECONDS);
+ });
+
+ }
+
+ /**
+ * Create infinite backoffs that keep half of the exponential growth, and jitter
+ * between 0 and that amount.
+ *
+ * <p>this is "equal jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
+ *
+ * @param startMs initial backoff in milliseconds.
+ * @param maxMs max backoff in milliseconds.
+ * @return a stream of exponential backoffs with jitters.
+ */
+ public static Stream<Long> equalJittered(long startMs,
+ long maxMs) {
+ final long startNanos = TimeUnit.NANOSECONDS.convert(startMs, TimeUnit.MILLISECONDS);
+ final long maxNanos = TimeUnit.NANOSECONDS.convert(maxMs, TimeUnit.MILLISECONDS);
+ final AtomicLong attempts = new AtomicLong(1);
+ return Stream.iterate(startMs, lastMs -> {
+ long shift = Math.min(attempts.get() - 1, MaxBitShift);
+ long halfExpNanos = startNanos * (1L << shift);
+ long backoffNanos = halfExpNanos + ThreadLocalRandom.current().nextLong(halfExpNanos);
+ attempts.incrementAndGet();
+ if (backoffNanos < maxNanos) {
+ return TimeUnit.MILLISECONDS.convert(backoffNanos, TimeUnit.NANOSECONDS);
+ } else {
+ return maxMs;
+ }
+ });
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MathUtils.java
similarity index 91%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MathUtils.java
index 1b3044d..94999a4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MathUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.bookkeeper.util;
+package org.apache.bookkeeper.common.util;
import java.util.concurrent.TimeUnit;
/**
- * Provides misc math functions that don't come standard
+ * Provides misc math functions that don't come standard.
*/
public class MathUtils {
+
private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
+
public static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % divisor);
@@ -32,7 +34,6 @@ public class MathUtils {
}
return mod;
-
}
public static int findNextPositivePowerOfTwo(final int value) {
@@ -45,7 +46,7 @@ public class MathUtils {
* changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire.
*
- * NOTE: only use it for measuring.
+ * <p>NOTE: only use it for measuring.
* http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
*
* @return current time in milliseconds.
@@ -60,7 +61,7 @@ public class MathUtils {
* changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire.
*
- * NOTE: only use it for measuring.
+ * <p>NOTE: only use it for measuring.
* http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
*
* @return current time in nanoseconds.
@@ -71,18 +72,18 @@ public class MathUtils {
/**
* Milliseconds elapsed since the time specified, the input is nanoTime
- * the only conversion happens when computing the elapsed time
+ * the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
*/
- public static long elapsedMSec (long startNanoTime) {
- return (System.nanoTime() - startNanoTime)/ NANOSECONDS_PER_MILLISECOND;
+ public static long elapsedMSec(long startNanoTime) {
+ return (System.nanoTime() - startNanoTime) / NANOSECONDS_PER_MILLISECOND;
}
/**
* Microseconds elapsed since the time specified, the input is nanoTime
- * the only conversion happens when computing the elapsed time
+ * the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
@@ -93,7 +94,7 @@ public class MathUtils {
/**
* Nanoseconds elapsed since the time specified, the input is nanoTime
- * the only conversion happens when computing the elapsed time
+ * the only conversion happens when computing the elapsed time.
*
* @param startNanoTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
similarity index 73%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 76f0830..41a7fa0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,9 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.bookkeeper.util;
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -25,38 +26,35 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class provides 2 things over the java {@link ScheduledExecutorService}.
*
- * 1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
+ * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
* This means that exceptions in scheduled tasks wont go unnoticed and will be
* logged.
*
- * 2. It supports submitting tasks with an ordering key, so that tasks submitted
+ * <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
* with the same key will always be executed in order, but tasks across
* different keys can be unordered. This retains parallelism while retaining the
* basic amount of ordering we want (e.g. , per ledger handle). Ordering is
* achieved by hashing the key objects to threads by their {@link #hashCode()}
* method.
- *
*/
-public class OrderedSafeExecutor {
- final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
+public class OrderedScheduler {
+
+ protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
+
final String name;
final ListeningScheduledExecutorService threads[];
final long threadIds[];
@@ -66,59 +64,78 @@ public class OrderedSafeExecutor {
final boolean traceTaskExecution;
final long warnTimeMicroSec;
- public static Builder newBuilder() {
- return new Builder();
+ /**
+ * Create a builder to build ordered scheduler.
+ *
+ * @return builder to build ordered scheduler.
+ */
+ public static SchedulerBuilder newSchedulerBuilder() {
+ return new SchedulerBuilder();
}
- public static class Builder {
- private String name = "OrderedSafeExecutor";
- private int numThreads = Runtime.getRuntime().availableProcessors();
- private ThreadFactory threadFactory = null;
- private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
- private boolean traceTaskExecution = false;
- private long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+ /**
+ * Builder to build ordered scheduler.
+ */
+ public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
- public Builder name(String name) {
+ /**
+ * Abstract builder class to build {@link OrderedScheduler}.
+ */
+ public abstract static class AbstractBuilder<T extends OrderedScheduler> {
+ protected String name = getClass().getSimpleName();
+ protected int numThreads = Runtime.getRuntime().availableProcessors();
+ protected ThreadFactory threadFactory = null;
+ protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+ protected boolean traceTaskExecution = false;
+ protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+
+ public AbstractBuilder<T> name(String name) {
this.name = name;
return this;
}
- public Builder numThreads(int num) {
+ public AbstractBuilder<T> numThreads(int num) {
this.numThreads = num;
return this;
}
- public Builder threadFactory(ThreadFactory threadFactory) {
+ public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
- public Builder statsLogger(StatsLogger statsLogger) {
+ public AbstractBuilder<T> statsLogger(StatsLogger statsLogger) {
this.statsLogger = statsLogger;
return this;
}
- public Builder traceTaskExecution(boolean enabled) {
+ public AbstractBuilder<T> traceTaskExecution(boolean enabled) {
this.traceTaskExecution = enabled;
return this;
}
- public Builder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
+ public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
this.warnTimeMicroSec = warnTimeMicroSec;
return this;
}
- public OrderedSafeExecutor build() {
+ @SuppressWarnings("unchecked")
+ public T build() {
if (null == threadFactory) {
threadFactory = Executors.defaultThreadFactory();
}
- return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger,
- traceTaskExecution, warnTimeMicroSec);
+ return (T) new OrderedScheduler(
+ name,
+ numThreads,
+ threadFactory,
+ statsLogger,
+ traceTaskExecution,
+ warnTimeMicroSec);
}
}
- private class TimedRunnable extends SafeRunnable {
+ private class TimedRunnable implements SafeRunnable {
final SafeRunnable runnable;
final long initNanos;
@@ -135,20 +152,14 @@ public class OrderedSafeExecutor {
long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
if (elapsedMicroSec >= warnTimeMicroSec) {
- logger.warn("Runnable {}:{} took too long {} micros to execute.",
+ LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
new Object[] { runnable, runnable.getClass(), elapsedMicroSec });
}
}
- }
-
- @Deprecated
- public OrderedSafeExecutor(int numThreads, String threadName) {
- this(threadName, numThreads, Executors.defaultThreadFactory(), NullStatsLogger.INSTANCE,
- false, WARN_TIME_MICRO_SEC_DEFAULT);
}
/**
- * Constructs Safe executor
+ * Constructs Safe executor.
*
* @param numThreads
* - number of threads
@@ -163,11 +174,14 @@ public class OrderedSafeExecutor {
* @param warnTimeMicroSec
* - log long task exec warning after this interval
*/
- private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
- StatsLogger statsLogger, boolean traceTaskExecution,
- long warnTimeMicroSec) {
- Preconditions.checkArgument(numThreads > 0);
- Preconditions.checkArgument(!StringUtils.isBlank(baseName));
+ protected OrderedScheduler(String baseName,
+ int numThreads,
+ ThreadFactory threadFactory,
+ StatsLogger statsLogger,
+ boolean traceTaskExecution,
+ long warnTimeMicroSec) {
+ checkArgument(numThreads > 0);
+ checkArgument(!StringUtils.isBlank(baseName));
this.warnTimeMicroSec = warnTimeMicroSec;
name = baseName;
@@ -176,7 +190,7 @@ public class OrderedSafeExecutor {
for (int i = 0; i < numThreads; i++) {
final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
- .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
+ .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
.setThreadFactory(threadFactory)
.build());
threads[i] = MoreExecutors.listeningDecorator(thread);
@@ -255,7 +269,7 @@ public class OrderedSafeExecutor {
}
/**
- * skip hashcode generation in this special case
+ * skip hashcode generation in this special case.
*
* @param orderingKey long ordering key
* @return the thread for executing this order key
@@ -277,14 +291,15 @@ public class OrderedSafeExecutor {
}
/**
- * schedules a one time action to execute
+ * schedules a one time action to execute.
*/
public void submit(SafeRunnable r) {
chooseThread().submit(timedRunnable(r));
}
/**
- * schedules a one time action to execute with an ordering guarantee on the key
+ * schedules a one time action to execute with an ordering guarantee on the key.
+ *
* @param orderingKey
* @param r
*/
@@ -293,7 +308,8 @@ public class OrderedSafeExecutor {
}
/**
- * schedules a one time action to execute with an ordering guarantee on the key
+ * schedules a one time action to execute with an ordering guarantee on the key.
+ *
* @param orderingKey
* @param r
*/
@@ -302,7 +318,8 @@ public class OrderedSafeExecutor {
}
/**
- * schedules a one time action to execute with an ordering guarantee on the key
+ * schedules a one time action to execute with an ordering guarantee on the key.
+ *
* @param orderingKey
* @param r
*/
@@ -327,7 +344,8 @@ public class OrderedSafeExecutor {
* @param command - the SafeRunnable to execute
* @param delay - the time from now to delay execution
* @param unit - the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+ * @return a ScheduledFuture representing pending completion of the task and whose get() method
+ * will return null upon completion
*/
public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
return chooseThread().schedule(command, delay, unit);
@@ -340,7 +358,8 @@ public class OrderedSafeExecutor {
* @param command - the SafeRunnable to execute
* @param delay - the time from now to delay execution
* @param unit - the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+ * @return a ScheduledFuture representing pending completion of the task and whose get() method
+ * will return null upon completion
*/
public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
return chooseThread(orderingKey).schedule(command, delay, unit);
@@ -348,9 +367,9 @@ public class OrderedSafeExecutor {
/**
* Creates and executes a periodic action that becomes enabled first after
- * the given initial delay, and subsequently with the given period;
+ * the given initial delay, and subsequently with the given period.
*
- * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+ * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
*
* @param command - the SafeRunnable to execute
* @param initialDelay - the time to delay first execution
@@ -365,9 +384,9 @@ public class OrderedSafeExecutor {
/**
* Creates and executes a periodic action that becomes enabled first after
- * the given initial delay, and subsequently with the given period;
+ * the given initial delay, and subsequently with the given period.
*
- * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+ * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
*
* @param orderingKey - the key used for ordering
* @param command - the SafeRunnable to execute
@@ -386,7 +405,8 @@ public class OrderedSafeExecutor {
* Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
* with the given delay between the termination of one execution and the commencement of the next.
*
- * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+ * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
+ * .
*
* @param command - the SafeRunnable to execute
* @param initialDelay - the time to delay first execution
@@ -404,7 +424,8 @@ public class OrderedSafeExecutor {
* Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
* with the given delay between the termination of one execution and the commencement of the next.
*
- * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+ * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
+ * .
*
* @param orderingKey - the key used for ordering
* @param command - the SafeRunnable to execute
@@ -419,7 +440,7 @@ public class OrderedSafeExecutor {
return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
- private long getThreadID(long orderingKey) {
+ protected long getThreadID(long orderingKey) {
// skip hashcode generation in this special case
if (threadIds.length == 1) {
return threadIds[0];
@@ -452,63 +473,11 @@ public class OrderedSafeExecutor {
if (!threads[i].awaitTermination(timeout, unit)) {
threads[i].shutdownNow();
}
- }
- catch (InterruptedException exception) {
+ } catch (InterruptedException exception) {
threads[i].shutdownNow();
Thread.currentThread().interrupt();
}
}
}
- /**
- * Generic callback implementation which will run the
- * callback in the thread which matches the ordering key
- */
- public static abstract class OrderedSafeGenericCallback<T>
- implements GenericCallback<T> {
- private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
-
- private final OrderedSafeExecutor executor;
- private final long orderingKey;
-
- /**
- * @param executor The executor on which to run the callback
- * @param orderingKey Key used to decide which thread the callback
- * should run on.
- */
- public OrderedSafeGenericCallback(OrderedSafeExecutor executor, long orderingKey) {
- this.executor = executor;
- this.orderingKey = orderingKey;
- }
-
- @Override
- public final void operationComplete(final int rc, final T result) {
- // during closing, callbacks that are error out might try to submit to
- // the scheduler again. if the submission will go to same thread, we
- // don't need to submit to executor again. this is also an optimization for
- // callback submission
- if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
- safeOperationComplete(rc, result);
- } else {
- try {
- executor.submitOrdered(orderingKey, new SafeRunnable() {
- @Override
- public void safeRun() {
- safeOperationComplete(rc, result);
- }
- @Override
- public String toString() {
- return String.format("Callback(key=%s, name=%s)",
- orderingKey,
- OrderedSafeGenericCallback.this);
- }
- });
- } catch (RejectedExecutionException re) {
- LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
- }
- }
- }
-
- public abstract void safeOperationComplete(int rc, T result);
- }
}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
new file mode 100644
index 0000000..3dd4da8
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * A util class for supporting retries with customized backoff.
+ */
+public final class Retries {
+
+ private Retries() {
+ }
+
+ public static final Predicate<Throwable> NonFatalPredicate =
+ cause -> !(cause instanceof RuntimeException);
+
+ /**
+ * Retry a given {@code task} on failures.
+ *
+ * <p>It is a shortcut of {@link #run(Stream, Predicate, Supplier, OrderedScheduler, Object)}
+ * that runs retries on any threads in the provided {@code scheduler}.
+ *
+ * @param backoffs a stream of backoff delays, in milliseconds.
+ * @param retryPredicate a predicate to test if failures are retryable.
+ * @param task a task to execute.
+ * @param scheduler scheduler to schedule the task and complete the futures.
+ * @param <ReturnT> the return type
+ * @return future represents the result of the task with retries.
+ */
+ public static <ReturnT> CompletableFuture<ReturnT> run(
+ Stream<Long> backoffs,
+ Predicate<Throwable> retryPredicate,
+ Supplier<CompletableFuture<ReturnT>> task,
+ OrderedScheduler scheduler) {
+ return run(backoffs, retryPredicate, task, scheduler, null);
+ }
+
+ /**
+ * Retry a given {@code task} on failures.
+ *
+ * <p>It will only retry the tasks when the predicate {@code retryPredicate} tests
+ * it as a retryable failure and it doesn't exhaust the retry budget. The retry delays
+ * are defined in a stream of delay values (in milliseconds).
+ *
+ * <p>If a schedule {@code key} is provided, the {@code task} will be submitted to the
+ * scheduler using the provided schedule {@code key} and also the returned future
+ * will be completed in the same thread. Otherwise, the task and the returned future will
+ * be executed and scheduled on any threads in the scheduler.
+ *
+ * @param backoffs a stream of backoff delays, in milliseconds.
+ * @param retryPredicate a predicate to test if failures are retryable.
+ * @param task a task to execute.
+ * @param scheduler scheduler to schedule the task and complete the futures.
+ * @param key the submit key for the scheduler.
+ * @param <ReturnT> the return tye.
+ * @return future represents the result of the task with retries.
+ */
+ public static <ReturnT> CompletableFuture<ReturnT> run(
+ Stream<Long> backoffs,
+ Predicate<Throwable> retryPredicate,
+ Supplier<CompletableFuture<ReturnT>> task,
+ OrderedScheduler scheduler,
+ Object key) {
+ CompletableFuture<ReturnT> future = FutureUtils.createFuture();
+ if (null == key) {
+ execute(
+ future,
+ backoffs.iterator(),
+ retryPredicate,
+ task,
+ scheduler,
+ null);
+ } else {
+ scheduler.submitOrdered(key, () -> execute(
+ future,
+ backoffs.iterator(),
+ retryPredicate,
+ task,
+ scheduler,
+ key));
+ }
+ return future;
+ }
+
+ private static <ReturnT> void execute(
+ CompletableFuture<ReturnT> finalResult,
+ Iterator<Long> backoffIter,
+ Predicate<Throwable> retryPredicate,
+ Supplier<CompletableFuture<ReturnT>> task,
+ OrderedScheduler scheduler,
+ Object key) {
+
+ FutureUtils.whenCompleteAsync(task.get(), (result, cause) -> {
+ if (null == cause) {
+ finalResult.complete(result);
+ return;
+ }
+ if (retryPredicate.test(cause)) {
+ if (!backoffIter.hasNext()) {
+ // exhausts all the retry budgets, fail the task now
+ finalResult.completeExceptionally(cause);
+ return;
+ }
+ long nextRetryDelayMs = backoffIter.next();
+ scheduler.scheduleOrdered(key, () -> execute(
+ finalResult,
+ backoffIter,
+ retryPredicate,
+ task,
+ scheduler,
+ key), nextRetryDelayMs, TimeUnit.MILLISECONDS);
+ } else {
+ // the exception can not be retried
+ finalResult.completeExceptionally(cause);
+ }
+ }, scheduler, key);
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SafeRunnable.java
similarity index 66%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SafeRunnable.java
index 8b1e0d0..6a3cf4d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SafeRunnable.java
@@ -1,8 +1,4 @@
-package org.apache.bookkeeper.util;
-
-import java.util.function.Consumer;
-
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,28 +16,35 @@ import java.util.function.Consumer;
* limitations under the License.
*/
+package org.apache.bookkeeper.common.util;
+
+import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class SafeRunnable implements Runnable {
+/**
+ * A runnable that catches runtime exceptions.
+ */
+@FunctionalInterface
+public interface SafeRunnable extends Runnable {
- static final Logger logger = LoggerFactory.getLogger(SafeRunnable.class);
+ Logger LOGGER = LoggerFactory.getLogger(SafeRunnable.class);
@Override
- public void run() {
+ default void run() {
try {
safeRun();
- } catch(Throwable t) {
- logger.error("Unexpected throwable caught ", t);
+ } catch (Throwable t) {
+ LOGGER.error("Unexpected throwable caught ", t);
}
}
- public abstract void safeRun();
+ void safeRun();
/**
- * Utility method to use SafeRunnable from lambdas
- * <p>
- * Eg:
+ * Utility method to use SafeRunnable from lambdas.
+ *
+ * <p>Eg:
* <pre>
* <code>
* executor.submit(SafeRunnable.safeRun(() -> {
@@ -50,7 +53,7 @@ public abstract class SafeRunnable implements Runnable {
* </code>
* </pre>
*/
- public static SafeRunnable safeRun(Runnable runnable) {
+ static SafeRunnable safeRun(Runnable runnable) {
return new SafeRunnable() {
@Override
public void safeRun() {
@@ -61,9 +64,9 @@ public abstract class SafeRunnable implements Runnable {
/**
* Utility method to use SafeRunnable from lambdas with
- * a custom exception handler
- * <p>
- * Eg:
+ * a custom exception handler.
+ *
+ * <p>Eg:
* <pre>
* <code>
* executor.submit(SafeRunnable.safeRun(() -> {
@@ -79,16 +82,13 @@ public abstract class SafeRunnable implements Runnable {
* handler that will be called when there are any exception
* @return
*/
- public static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable> exceptionHandler) {
- return new SafeRunnable() {
- @Override
- public void safeRun() {
- try {
- runnable.run();
- } catch (Throwable t) {
- exceptionHandler.accept(t);
- throw t;
- }
+ static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable> exceptionHandler) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ exceptionHandler.accept(t);
+ throw t;
}
};
}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/StreamUtil.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/StreamUtil.java
new file mode 100644
index 0000000..28b510a
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/StreamUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Java {@link java.util.stream.Stream} related operations.
+ */
+public class StreamUtil {
+
+ public static <T1, T2, R> Stream<R> zip(Stream<? extends T1> a,
+ Stream<? extends T2> b,
+ BiFunction<? super T1, ? super T2, ? extends R> zipper) {
+ Objects.requireNonNull(zipper);
+ Spliterator<? extends T1> aSpliterator = Objects.requireNonNull(a).spliterator();
+ Spliterator<? extends T2> bSpliterator = Objects.requireNonNull(b).spliterator();
+
+ // Zipping looses DISTINCT and SORTED characteristics
+ int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics()
+ & ~(Spliterator.DISTINCT | Spliterator.SORTED);
+
+ long zipSize = ((characteristics & Spliterator.SIZED) != 0)
+ ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
+ : -1;
+
+ Iterator<T1> aIterator = Spliterators.iterator(aSpliterator);
+ Iterator<T2> bIterator = Spliterators.iterator(bSpliterator);
+ Iterator<R> cIterator = new Iterator<R>() {
+ @Override
+ public boolean hasNext() {
+ return aIterator.hasNext() && bIterator.hasNext();
+ }
+
+ @Override
+ public R next() {
+ return zipper.apply(aIterator.next(), bIterator.next());
+ }
+ };
+
+ Spliterator<R> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
+ return (a.isParallel() || b.isParallel())
+ ? StreamSupport.stream(split, true)
+ : StreamSupport.stream(split, false);
+ }
+
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/concurrent/TestFutureUtils.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/concurrent/TestFutureUtils.java
new file mode 100644
index 0000000..fe11e5a
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/concurrent/TestFutureUtils.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.junit.Test;
+
+/**
+ * Unit Test for {@link FutureUtils}.
+ */
+public class TestFutureUtils {
+
+ /**
+ * Test Exception.
+ */
+ static class TestException extends IOException {
+ private static final long serialVersionUID = -6256482498453846308L;
+
+ public TestException() {
+ super("test-exception");
+ }
+ }
+
+ @Test
+ public void testComplete() throws Exception {
+ CompletableFuture<Long> future = FutureUtils.createFuture();
+ FutureUtils.complete(future, 1024L);
+ assertEquals(1024L, FutureUtils.result(future).longValue());
+ }
+
+ @Test(expected = TestException.class)
+ public void testCompleteExceptionally() throws Exception {
+ CompletableFuture<Long> future = FutureUtils.createFuture();
+ FutureUtils.completeExceptionally(future, new TestException());
+ FutureUtils.result(future);
+ }
+
+ @Test
+ public void testWhenCompleteAsync() throws Exception {
+ OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+ .name("test-when-complete-async")
+ .numThreads(1)
+ .build();
+ AtomicLong resultHolder = new AtomicLong(0L);
+ CountDownLatch latch = new CountDownLatch(1);
+ CompletableFuture<Long> future = FutureUtils.createFuture();
+ FutureUtils.whenCompleteAsync(
+ future,
+ (result, cause) -> {
+ resultHolder.set(result);
+ latch.countDown();
+ },
+ scheduler,
+ new Object());
+ FutureUtils.complete(future, 1234L);
+ latch.await();
+ assertEquals(1234L, resultHolder.get());
+ }
+
+ @Test
+ public void testProxyToSuccess() throws Exception {
+ CompletableFuture<Long> src = FutureUtils.createFuture();
+ CompletableFuture<Long> target = FutureUtils.createFuture();
+ FutureUtils.proxyTo(src, target);
+ FutureUtils.complete(src, 10L);
+ assertEquals(10L, FutureUtils.result(target).longValue());
+ }
+
+ @Test(expected = TestException.class)
+ public void testProxyToFailure() throws Exception {
+ CompletableFuture<Long> src = FutureUtils.createFuture();
+ CompletableFuture<Long> target = FutureUtils.createFuture();
+ FutureUtils.proxyTo(src, target);
+ FutureUtils.completeExceptionally(src, new TestException());
+ FutureUtils.result(target);
+ }
+
+ @Test
+ public void testVoid() throws Exception {
+ CompletableFuture<Void> voidFuture = FutureUtils.Void();
+ assertTrue(voidFuture.isDone());
+ assertFalse(voidFuture.isCompletedExceptionally());
+ assertFalse(voidFuture.isCancelled());
+ }
+
+ @Test
+ public void testCollectEmptyList() throws Exception {
+ List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+ List<Integer> result = FutureUtils.result(FutureUtils.collect(futures));
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testCollectTenItems() throws Exception {
+ List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+ List<Integer> expectedResults = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ futures.add(FutureUtils.value(i));
+ expectedResults.add(i);
+ }
+ List<Integer> results = FutureUtils.result(FutureUtils.collect(futures));
+ assertEquals(expectedResults, results);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCollectFailures() throws Exception {
+ List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+ List<Integer> expectedResults = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ if (i == 9) {
+ futures.add(FutureUtils.value(i));
+ } else {
+ futures.add(FutureUtils.exception(new TestException()));
+ }
+ expectedResults.add(i);
+ }
+ FutureUtils.result(FutureUtils.collect(futures));
+ }
+
+ @Test
+ public void testWithinAlreadyDone() throws Exception {
+ OrderedScheduler scheduler = mock(OrderedScheduler.class);
+ CompletableFuture<Long> doneFuture = FutureUtils.value(1234L);
+ CompletableFuture<Long> withinFuture = FutureUtils.within(
+ doneFuture,
+ 10,
+ TimeUnit.MILLISECONDS,
+ new TestException(),
+ scheduler,
+ 1234L);
+ TimeUnit.MILLISECONDS.sleep(20);
+ assertTrue(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+ verify(scheduler, times(0))
+ .scheduleOrdered(eq(1234L), isA(SafeRunnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testWithinZeroTimeout() throws Exception {
+ OrderedScheduler scheduler = mock(OrderedScheduler.class);
+ CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> withinFuture = FutureUtils.within(
+ newFuture,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new TestException(),
+ scheduler,
+ 1234L);
+ TimeUnit.MILLISECONDS.sleep(20);
+ assertFalse(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+ verify(scheduler, times(0))
+ .scheduleOrdered(eq(1234L), isA(SafeRunnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testWithinCompleteBeforeTimeout() throws Exception {
+ OrderedScheduler scheduler = mock(OrderedScheduler.class);
+ ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+ when(scheduler.scheduleOrdered(any(Object.class), any(SafeRunnable.class), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocationOnMock -> scheduledFuture);
+ CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> withinFuture = FutureUtils.within(
+ newFuture,
+ Long.MAX_VALUE,
+ TimeUnit.MILLISECONDS,
+ new TestException(),
+ scheduler,
+ 1234L);
+ assertFalse(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+
+ newFuture.complete(5678L);
+
+ assertTrue(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+ assertEquals((Long) 5678L, FutureUtils.result(withinFuture));
+
+ verify(scheduledFuture, times(1))
+ .cancel(eq(true));
+ }
+
+ @Test
+ public void testIgnoreSuccess() {
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+ underlyFuture.complete(1234L);
+ assertTrue(ignoredFuture.isDone());
+ assertFalse(ignoredFuture.isCompletedExceptionally());
+ assertFalse(ignoredFuture.isCancelled());
+ }
+
+ @Test
+ public void testIgnoreFailure() {
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+ underlyFuture.completeExceptionally(new TestException());
+ assertTrue(ignoredFuture.isDone());
+ assertFalse(ignoredFuture.isCompletedExceptionally());
+ assertFalse(ignoredFuture.isCancelled());
+ }
+
+ @Test
+ public void testEnsureSuccess() throws Exception {
+ CountDownLatch ensureLatch = new CountDownLatch(1);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+ ensureLatch.countDown();
+ });
+ underlyFuture.complete(1234L);
+ FutureUtils.result(ensuredFuture);
+ assertTrue(ensuredFuture.isDone());
+ assertFalse(ensuredFuture.isCompletedExceptionally());
+ assertFalse(ensuredFuture.isCancelled());
+ ensureLatch.await();
+ }
+
+ @Test
+ public void testEnsureFailure() throws Exception {
+ CountDownLatch ensureLatch = new CountDownLatch(1);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+ ensureLatch.countDown();
+ });
+ underlyFuture.completeExceptionally(new TestException());
+ FutureUtils.result(FutureUtils.ignore(ensuredFuture));
+ assertTrue(ensuredFuture.isDone());
+ assertTrue(ensuredFuture.isCompletedExceptionally());
+ assertFalse(ensuredFuture.isCancelled());
+ ensureLatch.await();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRescueSuccess() throws Exception {
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class);
+ CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc);
+ underlyFuture.complete(1234L);
+ FutureUtils.result(rescuedFuture);
+ assertTrue(rescuedFuture.isDone());
+ assertFalse(rescuedFuture.isCompletedExceptionally());
+ assertFalse(rescuedFuture.isCancelled());
+ verify(rescueFuc, times(0)).apply(any(Throwable.class));
+ }
+
+ @Test
+ public void testRescueFailure() throws Exception {
+ CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue);
+ underlyFuture.completeExceptionally(new TestException());
+ FutureUtils.result(rescuedFuture);
+ assertTrue(rescuedFuture.isDone());
+ assertFalse(rescuedFuture.isCompletedExceptionally());
+ assertFalse(rescuedFuture.isCancelled());
+ assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture));
+ }
+
+ @Test
+ public void testStatsSuccess() throws Exception {
+ OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> statsFuture = FutureUtils.stats(
+ underlyFuture,
+ statsLogger,
+ Stopwatch.createStarted());
+ underlyFuture.complete(1234L);
+ FutureUtils.result(statsFuture);
+ verify(statsLogger, times(1))
+ .registerSuccessfulEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
+ }
+
+ @Test
+ public void testStatsFailure() throws Exception {
+ OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> statsFuture = FutureUtils.stats(
+ underlyFuture,
+ statsLogger,
+ Stopwatch.createStarted());
+ underlyFuture.completeExceptionally(new TestException());
+ FutureUtils.result(FutureUtils.ignore(statsFuture));
+ verify(statsLogger, times(1))
+ .registerFailedEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
+ }
+
+ @Test
+ public void testProcessListSuccess() throws Exception {
+ List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+ List<Long> expectedList = Lists.transform(
+ longList,
+ aLong -> 2 * aLong);
+ Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+ CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+ longList,
+ sumFunc,
+ null);
+ assertEquals(expectedList, FutureUtils.result(totalFuture));
+ }
+
+ @Test
+ public void testProcessEmptyList() throws Exception {
+ List<Long> longList = Lists.newArrayList();
+ List<Long> expectedList = Lists.transform(
+ longList,
+ aLong -> 2 * aLong);
+ Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+ CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+ longList,
+ sumFunc,
+ null);
+ assertEquals(expectedList, FutureUtils.result(totalFuture));
+ }
+
+ @Test
+ public void testProcessListFailures() throws Exception {
+ List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+ AtomicLong total = new AtomicLong(0L);
+ Function<Long, CompletableFuture<Long>> sumFunc = value -> {
+ if (value < 5) {
+ total.addAndGet(value);
+ return FutureUtils.value(2 * value);
+ } else {
+ return FutureUtils.exception(new TestException());
+ }
+ };
+ CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+ longList,
+ sumFunc,
+ null);
+ try {
+ FutureUtils.result(totalFuture);
+ fail("Should fail with TestException");
+ } catch (TestException te) {
+ // as expected
+ }
+ assertEquals(10L, total.get());
+ }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestBackoff.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestBackoff.java
new file mode 100644
index 0000000..b35fcc0
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestBackoff.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.apache.bookkeeper.common.util.Backoff.Jitter.Type.DECORRELATED;
+import static org.apache.bookkeeper.common.util.Backoff.Jitter.Type.EQUAL;
+import static org.apache.bookkeeper.common.util.Backoff.Jitter.Type.EXPONENTIAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link Backoff}.
+ */
+public class TestBackoff {
+
+ static <T> void assertStreamEquals(Stream<T> s1, Stream<T> s2) {
+ Iterator<T> iter1 = s1.iterator(), iter2 = s2.iterator();
+ while (iter1.hasNext() && iter2.hasNext()) {
+ T expectedValue = iter1.next();
+ T actualValue = iter2.next();
+ assertEquals("Expected = " + expectedValue + ", Actual = " + actualValue,
+ expectedValue, actualValue);
+ }
+ assertTrue(!iter1.hasNext() && !iter2.hasNext());
+ }
+
+ @Test
+ public void testExponential() throws Exception {
+ Stream<Long> backoffs = Backoff.exponential(1000, 2, Long.MAX_VALUE).limit(10);
+ Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> (1000L << i));
+ assertStreamEquals(expectedBackoffs, backoffs);
+ }
+
+ @Test
+ public void testExponentialPolicy() throws Exception {
+ Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> (1000L << i));
+ Backoff.Policy policy = Backoff.Exponential.of(1000, Long.MAX_VALUE, 2, 10);
+ assertStreamEquals(expectedBackoffs, policy.toBackoffs());
+ }
+
+ @Test
+ public void testExponentialWithUpperLimit() throws Exception {
+ Stream<Long> backoffs = Backoff.exponential(1000, 2, 32000).limit(10);
+ Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> Math.min(1000L << i, 32000));
+ assertStreamEquals(expectedBackoffs, backoffs);
+ }
+
+ @Test
+ public void testExponentialPolicyWithUpperLimit() throws Exception {
+ Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> Math.min(1000L << i, 32000));
+ Backoff.Policy policy = Backoff.Exponential.of(1000, 32000, 2, 10);
+ assertStreamEquals(expectedBackoffs, policy.toBackoffs());
+ }
+
+ @Test
+ public void testExponentialJittered() throws Exception {
+ Stream<Long> backoffs = Backoff.exponentialJittered(5, 120).limit(10);
+ // Expected: 5, then randos up to: 10, 20, 40, 80, 120, 120, 120 ...
+ Stream<Long> maxBackoffs = Stream.of(5L, 10L, 20L, 40L, 80L, 120L, 120L, 120L, 120L, 120L);
+ StreamUtil.<Long, Long, Void>zip(backoffs, maxBackoffs, (expected, actual) -> {
+ assertTrue(expected <= actual);
+ return null;
+ });
+ }
+
+ @Test
+ public void testExponentialJitteredPolicy() throws Exception {
+ Stream<Long> backoffs = Backoff.Jitter.of(EXPONENTIAL, 5, 120, 10).toBackoffs();
+ // Expected: 5, then randos up to: 10, 20, 40, 80, 120, 120, 120 ...
+ Stream<Long> maxBackoffs = Stream.of(5L, 10L, 20L, 40L, 80L, 120L, 120L, 120L, 120L, 120L);
+ StreamUtil.<Long, Long, Void>zip(backoffs, maxBackoffs, (expected, actual) -> {
+ assertTrue(expected <= actual);
+ return null;
+ });
+ }
+
+ @Test
+ public void testConstant() throws Exception {
+ Stream<Long> backoffs = Backoff.constant(12345L).limit(10);
+ Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> 12345L);
+ assertStreamEquals(expectedBackoffs, backoffs);
+ }
+
+ @Test
+ public void testConstantPolicy() throws Exception {
+ Stream<Long> backoffs = Backoff.Constant.of(12345L, 10).toBackoffs();
+ Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> 12345L);
+ assertStreamEquals(expectedBackoffs, backoffs);
+ }
+
+ @Test
+ public void testEqualJittered() throws Exception {
+ Stream<Long> backoffs = Backoff.equalJittered(5, 120).limit(10);
+ Stream<Pair<Long, Long>> ranges = Stream.of(
+ Pair.of(5L, 10L),
+ Pair.of(10L, 20L),
+ Pair.of(20L, 40L),
+ Pair.of(40L, 80L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L)
+ );
+ StreamUtil.<Long, Pair<Long, Long>, Void>zip(backoffs, ranges, (backoff, maxPair) -> {
+ assertTrue(backoff >= maxPair.getLeft());
+ assertTrue(backoff <= maxPair.getRight());
+ return null;
+ });
+ }
+
+ @Test
+ public void testEqualJitteredPolicy() throws Exception {
+ Stream<Long> backoffs = Backoff.Jitter.of(EQUAL, 5, 120, 10).toBackoffs();
+ Stream<Pair<Long, Long>> ranges = Stream.of(
+ Pair.of(5L, 10L),
+ Pair.of(10L, 20L),
+ Pair.of(20L, 40L),
+ Pair.of(40L, 80L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L),
+ Pair.of(80L, 120L)
+ );
+ StreamUtil.<Long, Pair<Long, Long>, Void>zip(backoffs, ranges, (backoff, maxPair) -> {
+ assertTrue(backoff >= maxPair.getLeft());
+ assertTrue(backoff <= maxPair.getRight());
+ return null;
+ });
+ }
+
+ @Test
+ public void testDecorrelatedJittered() throws Exception {
+ long startMs = ThreadLocalRandom.current().nextLong(1L, 1000L);
+ long maxMs = ThreadLocalRandom.current().nextLong(startMs, startMs * 2);
+ Stream<Long> backoffs = Backoff.decorrelatedJittered(startMs, maxMs).limit(10);
+ Iterator<Long> backoffIter = backoffs.iterator();
+ assertTrue(backoffIter.hasNext());
+ assertEquals(startMs, backoffIter.next().longValue());
+ AtomicLong prevMs = new AtomicLong(startMs);
+ backoffIter.forEachRemaining(backoffMs -> {
+ assertTrue(backoffMs >= startMs);
+ assertTrue(backoffMs <= prevMs.get() * 3);
+ assertTrue(backoffMs <= maxMs);
+ prevMs.set(backoffMs);
+ });
+ }
+
+ @Test
+ public void testDecorrelatedJitteredPolicy() throws Exception {
+ long startMs = ThreadLocalRandom.current().nextLong(1L, 1000L);
+ long maxMs = ThreadLocalRandom.current().nextLong(startMs, startMs * 2);
+ Stream<Long> backoffs = Backoff.Jitter.of(DECORRELATED, startMs, maxMs, 10).toBackoffs();
+ Iterator<Long> backoffIter = backoffs.iterator();
+ assertTrue(backoffIter.hasNext());
+ assertEquals(startMs, backoffIter.next().longValue());
+ AtomicLong prevMs = new AtomicLong(startMs);
+ backoffIter.forEachRemaining(backoffMs -> {
+ assertTrue(backoffMs >= startMs);
+ assertTrue(backoffMs <= prevMs.get() * 3);
+ assertTrue(backoffMs <= maxMs);
+ prevMs.set(backoffMs);
+ });
+ }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestMathUtils.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestMathUtils.java
new file mode 100644
index 0000000..3bd58c2
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestMathUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.apache.bookkeeper.common.util.MathUtils.findNextPositivePowerOfTwo;
+import static org.apache.bookkeeper.common.util.MathUtils.now;
+import static org.apache.bookkeeper.common.util.MathUtils.nowInNano;
+import static org.apache.bookkeeper.common.util.MathUtils.signSafeMod;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link MathUtils}.
+ */
+public class TestMathUtils {
+
+ @Test
+ public void testSignSafeMod() {
+ assertEquals(1, signSafeMod(11, 2));
+ assertEquals(1, signSafeMod(-11, 2));
+ assertEquals(1, signSafeMod(11, -2));
+ assertEquals(-3, signSafeMod(-11, -2));
+ }
+
+ @Test
+ public void testFindNextPositivePowerOfTwo() {
+ assertEquals(16384, findNextPositivePowerOfTwo(12345));
+ }
+
+ @Test
+ public void testNow() {
+ long nowInMillis = now();
+ assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) >= nowInMillis);
+ }
+
+ @Test
+ public void testNowInNanos() {
+ long nowInNanos = nowInNano();
+ assertTrue(System.nanoTime() >= nowInNanos);
+ }
+
+}
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 266685a..2042328 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -99,11 +99,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.3.2</version>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
@@ -114,17 +109,17 @@
<version>1.6</version>
</dependency>
<dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.6</version>
- </dependency>
- <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java
deleted file mode 100644
index 3e9583c..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.bookkeeper.util;
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import java.io.IOException;
-
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieServer;
-
-public class Main {
-
- static void usage() {
- System.err.println("USAGE: bookkeeper client|bookie");
- }
-
- /**
- * @param args
- * @throws InterruptedException
- * @throws IOException
- */
- public static void main(String[] args) throws Exception {
- if (args.length < 1 || !(args[0].equals("client") || args[0].equals("bookie"))) {
- usage();
- return;
- }
- String newArgs[] = new String[args.length - 1];
- System.arraycopy(args, 1, newArgs, 0, newArgs.length);
- if (args[0].equals("bookie")) {
- BookieServer.main(newArgs);
- } else {
- BookieClient.main(newArgs);
- }
- }
-
-}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
index 1b3044d..d497b04 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
@@ -17,88 +17,10 @@
*/
package org.apache.bookkeeper.util;
-import java.util.concurrent.TimeUnit;
-
/**
* Provides misc math functions that don't come standard
+ *
+ * @Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.MathUtils}.
*/
-public class MathUtils {
- private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
- public static int signSafeMod(long dividend, int divisor) {
- int mod = (int) (dividend % divisor);
-
- if (mod < 0) {
- mod += divisor;
- }
-
- return mod;
-
- }
-
- public static int findNextPositivePowerOfTwo(final int value) {
- return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
- }
-
- /**
- * Current time from some arbitrary time base in the past, counting in
- * milliseconds, and not affected by settimeofday or similar system clock
- * changes. This is appropriate to use when computing how much longer to
- * wait for an interval to expire.
- *
- * NOTE: only use it for measuring.
- * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
- *
- * @return current time in milliseconds.
- */
- public static long now() {
- return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
- }
-
- /**
- * Current time from some arbitrary time base in the past, counting in
- * nanoseconds, and not affected by settimeofday or similar system clock
- * changes. This is appropriate to use when computing how much longer to
- * wait for an interval to expire.
- *
- * NOTE: only use it for measuring.
- * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
- *
- * @return current time in nanoseconds.
- */
- public static long nowInNano() {
- return System.nanoTime();
- }
-
- /**
- * Milliseconds elapsed since the time specified, the input is nanoTime
- * the only conversion happens when computing the elapsed time
- *
- * @param startNanoTime the start of the interval that we are measuring
- * @return elapsed time in milliseconds.
- */
- public static long elapsedMSec (long startNanoTime) {
- return (System.nanoTime() - startNanoTime)/ NANOSECONDS_PER_MILLISECOND;
- }
-
- /**
- * Microseconds elapsed since the time specified, the input is nanoTime
- * the only conversion happens when computing the elapsed time
- *
- * @param startNanoTime the start of the interval that we are measuring
- * @return elapsed time in milliseconds.
- */
- public static long elapsedMicroSec(long startNanoTime) {
- return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
- }
-
- /**
- * Nanoseconds elapsed since the time specified, the input is nanoTime
- * the only conversion happens when computing the elapsed time
- *
- * @param startNanoTime the start of the interval that we are measuring
- * @return elapsed time in milliseconds.
- */
- public static long elapsedNanos(long startNanoTime) {
- return System.nanoTime() - startNanoTime;
- }
+public class MathUtils extends org.apache.bookkeeper.common.util.MathUtils {
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index 76f0830..d832d18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -17,26 +17,15 @@
*/
package org.apache.bookkeeper.util;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,59 +43,15 @@ import org.slf4j.LoggerFactory;
* achieved by hashing the key objects to threads by their {@link #hashCode()}
* method.
*
+ * @Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.OrderedScheduler}.
*/
-public class OrderedSafeExecutor {
- final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
- final String name;
- final ListeningScheduledExecutorService threads[];
- final long threadIds[];
- final Random rand = new Random();
- final OpStatsLogger taskExecutionStats;
- final OpStatsLogger taskPendingStats;
- final boolean traceTaskExecution;
- final long warnTimeMicroSec;
+public class OrderedSafeExecutor extends org.apache.bookkeeper.common.util.OrderedScheduler {
public static Builder newBuilder() {
return new Builder();
}
- public static class Builder {
- private String name = "OrderedSafeExecutor";
- private int numThreads = Runtime.getRuntime().availableProcessors();
- private ThreadFactory threadFactory = null;
- private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
- private boolean traceTaskExecution = false;
- private long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
-
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- public Builder numThreads(int num) {
- this.numThreads = num;
- return this;
- }
-
- public Builder threadFactory(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- return this;
- }
-
- public Builder statsLogger(StatsLogger statsLogger) {
- this.statsLogger = statsLogger;
- return this;
- }
-
- public Builder traceTaskExecution(boolean enabled) {
- this.traceTaskExecution = enabled;
- return this;
- }
-
- public Builder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
- this.warnTimeMicroSec = warnTimeMicroSec;
- return this;
- }
+ public static class Builder extends AbstractBuilder<OrderedSafeExecutor> {
public OrderedSafeExecutor build() {
if (null == threadFactory) {
@@ -118,35 +63,6 @@ public class OrderedSafeExecutor {
}
- private class TimedRunnable extends SafeRunnable {
- final SafeRunnable runnable;
- final long initNanos;
-
- TimedRunnable(SafeRunnable runnable) {
- this.runnable = runnable;
- this.initNanos = MathUtils.nowInNano();
- }
-
- @Override
- public void safeRun() {
- taskPendingStats.registerSuccessfulEvent(initNanos, TimeUnit.NANOSECONDS);
- long startNanos = MathUtils.nowInNano();
- this.runnable.safeRun();
- long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
- taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
- if (elapsedMicroSec >= warnTimeMicroSec) {
- logger.warn("Runnable {}:{} took too long {} micros to execute.",
- new Object[] { runnable, runnable.getClass(), elapsedMicroSec });
- }
- }
- }
-
- @Deprecated
- public OrderedSafeExecutor(int numThreads, String threadName) {
- this(threadName, numThreads, Executors.defaultThreadFactory(), NullStatsLogger.INSTANCE,
- false, WARN_TIME_MICRO_SEC_DEFAULT);
- }
-
/**
* Constructs Safe executor
*
@@ -166,121 +82,14 @@ public class OrderedSafeExecutor {
private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
StatsLogger statsLogger, boolean traceTaskExecution,
long warnTimeMicroSec) {
- Preconditions.checkArgument(numThreads > 0);
- Preconditions.checkArgument(!StringUtils.isBlank(baseName));
-
- this.warnTimeMicroSec = warnTimeMicroSec;
- name = baseName;
- threads = new ListeningScheduledExecutorService[numThreads];
- threadIds = new long[numThreads];
- for (int i = 0; i < numThreads; i++) {
- final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
- .setThreadFactory(threadFactory)
- .build());
- threads[i] = MoreExecutors.listeningDecorator(thread);
- final int idx = i;
- try {
- threads[idx].submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- threadIds[idx] = Thread.currentThread().getId();
- }
- }).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Couldn't start thread " + i, e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Couldn't start thread " + i, e);
- }
-
- // Register gauges
- statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getQueue().size();
- }
- });
- statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getCompletedTaskCount();
- }
- });
- statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getTaskCount();
- }
- });
- }
-
- // Stats
- this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution");
- this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued");
- this.traceTaskExecution = traceTaskExecution;
- }
-
- public ListeningScheduledExecutorService chooseThread() {
- // skip random # generation in this special case
- if (threads.length == 1) {
- return threads[0];
- }
-
- return threads[rand.nextInt(threads.length)];
- }
-
- public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
- // skip hashcode generation in this special case
- if (threads.length == 1) {
- return threads[0];
- }
-
- return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
- }
-
- /**
- * skip hashcode generation in this special case
- *
- * @param orderingKey long ordering key
- * @return the thread for executing this order key
- */
- public ListeningScheduledExecutorService chooseThread(long orderingKey) {
- if (threads.length == 1) {
- return threads[0];
- }
-
- return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
- }
-
- private SafeRunnable timedRunnable(SafeRunnable r) {
- if (traceTaskExecution) {
- return new TimedRunnable(r);
- } else {
- return r;
- }
+ super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec);
}
/**
* schedules a one time action to execute
*/
public void submit(SafeRunnable r) {
- chooseThread().submit(timedRunnable(r));
+ super.submit(r);
}
/**
@@ -289,7 +98,7 @@ public class OrderedSafeExecutor {
* @param r
*/
public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
- return chooseThread(orderingKey).submit(timedRunnable(r));
+ return super.submitOrdered(orderingKey, r);
}
/**
@@ -298,7 +107,7 @@ public class OrderedSafeExecutor {
* @param r
*/
public void submitOrdered(long orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(r);
+ super.submitOrdered(orderingKey, r);
}
/**
@@ -307,18 +116,7 @@ public class OrderedSafeExecutor {
* @param r
*/
public void submitOrdered(int orderingKey, SafeRunnable r) {
- chooseThread(orderingKey).execute(r);
- }
-
- /**
- * schedules a one time action to execute with an ordering guarantee on the key.
- *
- * @param orderingKey
- * @param callable
- */
- public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
- java.util.concurrent.Callable<T> callable) {
- return chooseThread(orderingKey).submit(callable);
+ super.submitOrdered(orderingKey, r);
}
/**
@@ -330,7 +128,7 @@ public class OrderedSafeExecutor {
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
*/
public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
- return chooseThread().schedule(command, delay, unit);
+ return super.schedule(command, delay, unit);
}
/**
@@ -343,7 +141,7 @@ public class OrderedSafeExecutor {
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
*/
public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
- return chooseThread(orderingKey).schedule(command, delay, unit);
+ return super.scheduleOrdered(orderingKey, command, delay, unit);
}
/**
@@ -360,7 +158,7 @@ public class OrderedSafeExecutor {
* method will throw an exception upon cancellation
*/
public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
- return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+ return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}
/**
@@ -379,7 +177,7 @@ public class OrderedSafeExecutor {
*/
public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
long period, TimeUnit unit) {
- return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
+ return super.scheduleAtFixedRateOrdered(orderingKey, command, initialDelay, period, unit);
}
/**
@@ -397,7 +195,7 @@ public class OrderedSafeExecutor {
*/
public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
TimeUnit unit) {
- return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
/**
@@ -416,48 +214,7 @@ public class OrderedSafeExecutor {
*/
public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
long delay, TimeUnit unit) {
- return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
- }
-
- private long getThreadID(long orderingKey) {
- // skip hashcode generation in this special case
- if (threadIds.length == 1) {
- return threadIds[0];
- }
-
- return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
- }
-
- public void shutdown() {
- for (int i = 0; i < threads.length; i++) {
- threads[i].shutdown();
- }
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- boolean ret = true;
- for (int i = 0; i < threads.length; i++) {
- ret = ret && threads[i].awaitTermination(timeout, unit);
- }
- return ret;
- }
-
- /**
- * Force threads shutdown (cancel active requests) after specified delay,
- * to be used after shutdown() rejects new requests.
- */
- public void forceShutdown(long timeout, TimeUnit unit) {
- for (int i = 0; i < threads.length; i++) {
- try {
- if (!threads[i].awaitTermination(timeout, unit)) {
- threads[i].shutdownNow();
- }
- }
- catch (InterruptedException exception) {
- threads[i].shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
+ return super.scheduleWithFixedDelayOrdered(orderingKey, command, initialDelay, delay, unit);
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
index 8b1e0d0..7e56cd1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
@@ -1,7 +1,3 @@
-package org.apache.bookkeeper.util;
-
-import java.util.function.Consumer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,24 +15,11 @@ import java.util.function.Consumer;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class SafeRunnable implements Runnable {
-
- static final Logger logger = LoggerFactory.getLogger(SafeRunnable.class);
-
- @Override
- public void run() {
- try {
- safeRun();
- } catch(Throwable t) {
- logger.error("Unexpected throwable caught ", t);
- }
- }
+import java.util.function.Consumer;
- public abstract void safeRun();
+public abstract class SafeRunnable implements org.apache.bookkeeper.common.util.SafeRunnable {
/**
* Utility method to use SafeRunnable from lambdas
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index efee56a..1925cde 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -46,6 +46,27 @@
<Method name="getData" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
+ <Match>
+ <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils"/>
+ <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils$1"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+ </Match>
+ <Match>
+ <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils"/>
+ <Method name="Void" />
+ <Bug pattern="NM_METHOD_NAMING_CONVENTION" />
+ </Match>
+ <Match>
+ <Class name="org.apache.bookkeeper.util.MathUtils"/>
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+ </Match>
+ <Match>
+ <Class name="org.apache.bookkeeper.util.SafeRunnable"/>
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+ </Match>
<And>
<Bug category="MT_CORRECTNESS"/>
<Class name="~org.apache.bookkeeper.util.collections\.[^.]+"/>
diff --git a/pom.xml b/pom.xml
index 9879022..d5b066f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,8 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- dependencies -->
<commons-configuration.version>1.10</commons-configuration.version>
+ <commons-lang3.version>3.3.2</commons-lang3.version>
+ <google.code.version>3.0.2</google.code.version>
<guava.version>20.0</guava.version>
<hamcrest.version>1.3</hamcrest.version>
<jmh.version>1.19</jmh.version>
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].