You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:41 UTC
[09/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java
deleted file mode 100644
index 8e4a8be..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.exceptions.BKTransmitException;
-import org.apache.distributedlog.exceptions.LockingException;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.stats.OpStatsListener;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureCancelledException;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utilities to process future
- */
-public class FutureUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class);
-
- public static class OrderedFutureEventListener<R>
- implements FutureEventListener<R> {
-
- public static <R> OrderedFutureEventListener<R> of(
- FutureEventListener<R> listener,
- OrderedScheduler scheduler,
- Object key) {
- return new OrderedFutureEventListener<R>(scheduler, key, listener);
- }
-
- private final OrderedScheduler scheduler;
- private final Object key;
- private final FutureEventListener<R> listener;
-
- private OrderedFutureEventListener(OrderedScheduler scheduler,
- Object key,
- FutureEventListener<R> listener) {
- this.scheduler = scheduler;
- this.key = key;
- this.listener = listener;
- }
-
- @Override
- public void onSuccess(final R value) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- listener.onSuccess(value);
- }
- });
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- listener.onFailure(cause);
- }
- });
- }
- }
-
- public static class FutureEventListenerRunnable<R>
- implements FutureEventListener<R> {
-
- public static <R> FutureEventListenerRunnable<R> of(
- FutureEventListener<R> listener,
- ExecutorService executorService) {
- return new FutureEventListenerRunnable<R>(executorService, listener);
- }
-
- private final ExecutorService executorService;
- private final FutureEventListener<R> listener;
-
- private FutureEventListenerRunnable(ExecutorService executorService,
- FutureEventListener<R> listener) {
- this.executorService = executorService;
- this.listener = listener;
- }
-
- @Override
- public void onSuccess(final R value) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- listener.onSuccess(value);
- }
- });
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- listener.onFailure(cause);
- }
- });
- }
- }
-
- private static class ListFutureProcessor<T, R>
- extends Function<Throwable, BoxedUnit>
- implements FutureEventListener<R>, Runnable {
-
- private volatile boolean interrupted = false;
- private final Iterator<T> itemsIter;
- private final Function<T, Future<R>> processFunc;
- private final Promise<List<R>> promise;
- private final List<R> results;
- private final ExecutorService callbackExecutor;
-
- ListFutureProcessor(List<T> items,
- Function<T, Future<R>> processFunc,
- ExecutorService callbackExecutor) {
- this.itemsIter = items.iterator();
- this.processFunc = processFunc;
- this.promise = new Promise<List<R>>();
- this.promise.setInterruptHandler(this);
- this.results = new ArrayList<R>();
- this.callbackExecutor = callbackExecutor;
- }
-
- @Override
- public BoxedUnit apply(Throwable cause) {
- interrupted = true;
- return BoxedUnit.UNIT;
- }
-
- @Override
- public void onSuccess(R value) {
- results.add(value);
- if (null == callbackExecutor) {
- run();
- } else {
- callbackExecutor.submit(this);
- }
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- if (null == callbackExecutor) {
- promise.setException(cause);
- } else {
- callbackExecutor.submit(new Runnable() {
- @Override
- public void run() {
- promise.setException(cause);
- }
- });
- }
- }
-
- @Override
- public void run() {
- if (interrupted) {
- logger.debug("ListFutureProcessor is interrupted.");
- return;
- }
- if (!itemsIter.hasNext()) {
- promise.setValue(results);
- return;
- }
- processFunc.apply(itemsIter.next()).addEventListener(this);
- }
- }
-
- /**
- * Process the list of items one by one using the process function <i>processFunc</i>.
- * The process will be stopped immediately if it fails on processing any one.
- *
- * @param collection list of items
- * @param processFunc process function
- * @param callbackExecutor executor to process the item
- * @return future presents the list of processed results
- */
- public static <T, R> Future<List<R>> processList(List<T> collection,
- Function<T, Future<R>> processFunc,
- @Nullable ExecutorService callbackExecutor) {
- ListFutureProcessor<T, R> processor =
- new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
- if (null != callbackExecutor) {
- callbackExecutor.submit(processor);
- } else {
- processor.run();
- }
- return processor.promise;
- }
-
- /**
- * Add a event listener over <i>result</i> for collecting the operation stats.
- *
- * @param result result to listen on
- * @param opStatsLogger stats logger to record operations stats
- * @param stopwatch stop watch to time operation
- * @param <T>
- * @return result after registered the event listener
- */
- public static <T> Future<T> stats(Future<T> result,
- OpStatsLogger opStatsLogger,
- Stopwatch stopwatch) {
- return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch));
- }
-
- /**
- * Await for the result of the future and thrown bk related exceptions.
- *
- * @param result future to wait for
- * @return the result of future
- * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions
- * thrown from the future, the exceptions will be wrapped into
- * {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}.
- */
- public static <T> T bkResult(Future<T> result) throws BKException {
- try {
- return Await.result(result);
- } catch (BKException bke) {
- throw bke;
- } catch (InterruptedException ie) {
- throw BKException.create(BKException.Code.InterruptedException);
- } catch (Exception e) {
- logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e);
- throw BKException.create(BKException.Code.UnexpectedConditionException);
- }
- }
-
- /**
- * Return the bk exception return code for a <i>throwable</i>.
- *
- * @param throwable the cause of the exception
- * @return the bk exception return code. if the exception isn't bk exceptions,
- * it would return {@link BKException.Code#UnexpectedConditionException}.
- */
- public static int bkResultCode(Throwable throwable) {
- if (throwable instanceof BKException) {
- return ((BKException)throwable).getCode();
- }
- return BKException.Code.UnexpectedConditionException;
- }
-
- /**
- * Wait for the result until it completes.
- *
- * @param result result to wait
- * @return the result
- * @throws IOException when encountered exceptions on the result
- */
- public static <T> T result(Future<T> result) throws IOException {
- return result(result, Duration.Top());
- }
-
- /**
- * Wait for the result for a given <i>duration</i>.
- * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with
- * corresponding {@link com.twitter.util.TimeoutException}.
- *
- * @param result result to wait
- * @param duration duration to wait
- * @return the result
- * @throws IOException when encountered exceptions on the result or waiting for the result.
- */
- public static <T> T result(Future<T> result, Duration duration)
- throws IOException {
- try {
- return Await.result(result, duration);
- } catch (KeeperException ke) {
- throw new ZKException("Encountered zookeeper exception on waiting result", ke);
- } catch (BKException bke) {
- throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode());
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw new DLInterruptedException("Interrupted on waiting result", ie);
- } catch (Exception e) {
- throw new IOException("Encountered exception on waiting result", e);
- }
- }
-
- /**
- * Wait for the result of a lock operation.
- *
- * @param result result to wait
- * @param lockPath path of the lock
- * @return the result
- * @throws LockingException when encountered exceptions on the result of lock operation
- */
- public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException {
- try {
- return Await.result(result);
- } catch (LockingException le) {
- throw le;
- } catch (Exception e) {
- throw new LockingException(lockPath, "Encountered exception on locking ", e);
- }
- }
-
- /**
- * Convert the <i>throwable</i> to zookeeper related exceptions.
- *
- * @param throwable cause
- * @param path zookeeper path
- * @return zookeeper related exceptions
- */
- public static Throwable zkException(Throwable throwable, String path) {
- if (throwable instanceof KeeperException) {
- return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable);
- } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
- return new ZKException("Encountered zookeeper connection loss on " + path,
- KeeperException.Code.CONNECTIONLOSS);
- } else if (throwable instanceof InterruptedException) {
- return new DLInterruptedException("Interrupted on operating " + path, throwable);
- } else {
- return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
- }
- }
-
- /**
- * Cancel the future. It would interrupt the future.
- *
- * @param future future to cancel
- */
- public static <T> void cancel(Future<T> future) {
- future.raise(new FutureCancelledException());
- }
-
- /**
- * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
- * If the promise has been satisfied before raising, it won't change the state of the promise.
- *
- * @param promise promise to raise exception
- * @param timeout timeout period
- * @param unit timeout period unit
- * @param cause cause to raise
- * @param scheduler scheduler to execute raising exception
- * @param key the submit key used by the scheduler
- * @return the promise applied with the raise logic
- */
- public static <T> Promise<T> within(final Promise<T> promise,
- final long timeout,
- final TimeUnit unit,
- final Throwable cause,
- final OrderedScheduler scheduler,
- final Object key) {
- if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
- return promise;
- }
- // schedule a timeout to raise timeout exception
- final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
- @Override
- public void run() {
- if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
- logger.info("Raise exception", cause);
- }
- }
- }, timeout, unit);
- // when the promise is satisfied, cancel the timeout task
- promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Try<T> value) {
- if (!task.cancel(true)) {
- logger.debug("Failed to cancel the timeout task");
- }
- return BoxedUnit.UNIT;
- }
- });
- return promise;
- }
-
- /**
- * Satisfy the <i>promise</i> with provide value in an ordered scheduler.
- * <p>If the promise was already satisfied, nothing will be changed.
- *
- * @param promise promise to satisfy
- * @param value value to satisfy
- * @param scheduler scheduler to satisfy the promise with provided value
- * @param key the submit key of the ordered scheduler
- */
- public static <T> void setValue(final Promise<T> promise,
- final T value,
- OrderedScheduler scheduler,
- Object key) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- setValue(promise, value);
- }
- });
- }
-
- /**
- * Satisfy the <i>promise</i> with provide value.
- * <p>If the promise was already satisfied, nothing will be changed.
- *
- * @param promise promise to satisfy
- * @param value value to satisfy
- * @return true if successfully satisfy the future. false if the promise has been satisfied.
- */
- public static <T> boolean setValue(Promise<T> promise, T value) {
- boolean success = promise.updateIfEmpty(new Return<T>(value));
- if (!success) {
- logger.info("Result set multiple times. Value = '{}', New = 'Return({})'",
- promise.poll(), value);
- }
- return success;
- }
-
- /**
- * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler.
- *
- * @param promise promise to satisfy
- * @param throwable cause to satisfy
- * @param scheduler the scheduler to satisfy the promise
- * @param key submit key of the ordered scheduler
- */
- public static <T> void setException(final Promise<T> promise,
- final Throwable cause,
- OrderedScheduler scheduler,
- Object key) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- setException(promise, cause);
- }
- });
- }
-
- /**
- * Satisfy the <i>promise</i> with provided <i>cause</i>.
- *
- * @param promise promise to satisfy
- * @param cause cause to satisfy
- * @return true if successfully satisfy the future. false if the promise has been satisfied.
- */
- public static <T> boolean setException(Promise<T> promise, Throwable cause) {
- boolean success = promise.updateIfEmpty(new Throw<T>(cause));
- if (!success) {
- logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'",
- promise.poll(), cause);
- }
- return success;
- }
-
- /**
- * Ignore exception from the <i>future</i>.
- *
- * @param future the original future
- * @return a transformed future ignores exceptions
- */
- public static <T> Promise<Void> ignore(Future<T> future) {
- return ignore(future, null);
- }
-
- /**
- * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions
- *
- * @param future the original future
- * @param errorMsg the error message to log on exceptions
- * @return a transformed future ignores exceptions
- */
- public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) {
- final Promise<Void> promise = new Promise<Void>();
- future.addEventListener(new FutureEventListener<T>() {
- @Override
- public void onSuccess(T value) {
- setValue(promise, null);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- if (null != errorMsg) {
- logger.error(errorMsg, cause);
- }
- setValue(promise, null);
- }
- });
- return promise;
- }
-
- /**
- * Create transmit exception from transmit result.
- *
- * @param transmitResult
- * transmit result (basically bk exception code)
- * @return transmit exception
- */
- public static BKTransmitException transmitException(int transmitResult) {
- return new BKTransmitException("Failed to write to bookkeeper; Error is ("
- + transmitResult + ") "
- + BKException.getMessage(transmitResult), transmitResult);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java
deleted file mode 100644
index 3372476..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-
-import com.twitter.util.FuturePool;
-import com.twitter.util.FuturePool$;
-import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import scala.runtime.BoxedUnit;
-import scala.Function0;
-
-/**
- * {@link FuturePool} with exposed stats. This class is exposing following stats for helping understanding
- * the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * Stats are only exposed when <code>traceTaskExecution</code> is true.
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on waiting
- * being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on executing.
- * <li>task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on submitting.
- * <li>tasks_pending: gauge. how many tasks are pending in this future pool.
- * </ul>
- */
-public class MonitoredFuturePool implements FuturePool {
- static final Logger LOG = LoggerFactory.getLogger(MonitoredFuturePool.class);
-
- private final FuturePool futurePool;
-
- private final StatsLogger statsLogger;
- private final OpStatsLogger taskPendingTime;
- private final OpStatsLogger taskExecutionTime;
- private final OpStatsLogger taskEnqueueTime;
- private final Counter taskPendingCounter;
-
- private final boolean traceTaskExecution;
- private final long traceTaskExecutionWarnTimeUs;
-
- class TimedFunction0<T> extends com.twitter.util.Function0<T> {
- private final Function0<T> function0;
- private Stopwatch pendingStopwatch = Stopwatch.createStarted();
-
- TimedFunction0(Function0<T> function0) {
- this.function0 = function0;
- this.pendingStopwatch = Stopwatch.createStarted();
- }
-
- @Override
- public T apply() {
- taskPendingTime.registerSuccessfulEvent(pendingStopwatch.elapsed(TimeUnit.MICROSECONDS));
- Stopwatch executionStopwatch = Stopwatch.createStarted();
- T result = function0.apply();
- taskExecutionTime.registerSuccessfulEvent(executionStopwatch.elapsed(TimeUnit.MICROSECONDS));
- long elapsed = executionStopwatch.elapsed(TimeUnit.MICROSECONDS);
- if (elapsed > traceTaskExecutionWarnTimeUs) {
- LOG.info("{} took too long {} microseconds", function0.toString(), elapsed);
- }
- return result;
- }
- }
-
- /**
- * Create a future pool with stats exposed.
- *
- * @param futurePool underlying future pool to execute futures
- * @param statsLogger stats logger to receive exposed stats
- * @param traceTaskExecution flag to enable/disable exposing stats about task execution
- * @param traceTaskExecutionWarnTimeUs flag to enable/disable logging slow tasks
- * whose execution time is above this value
- */
- public MonitoredFuturePool(FuturePool futurePool,
- StatsLogger statsLogger,
- boolean traceTaskExecution,
- long traceTaskExecutionWarnTimeUs) {
- this.futurePool = futurePool;
- this.traceTaskExecution = traceTaskExecution;
- this.traceTaskExecutionWarnTimeUs = traceTaskExecutionWarnTimeUs;
- this.statsLogger = statsLogger;
- this.taskPendingTime = statsLogger.getOpStatsLogger("task_pending_time");
- this.taskExecutionTime = statsLogger.getOpStatsLogger("task_execution_time");
- this.taskEnqueueTime = statsLogger.getOpStatsLogger("task_enqueue_time");
- this.taskPendingCounter = statsLogger.getCounter("tasks_pending");
- }
-
- @Override
- public <T> Future<T> apply(Function0<T> function0) {
- if (traceTaskExecution) {
- taskPendingCounter.inc();
- Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted();
- Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0));
- taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS));
- futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- taskPendingCounter.dec();
- return null;
- }
- });
- return futureResult;
- } else {
- return futurePool.apply(function0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
deleted file mode 100644
index 3121a19..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for
- * helping understanding the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * <ul>
- * <li>pending_tasks: gauge. how many tasks are pending in this executor.
- * <li>completed_tasks: gauge. how many tasks are completed in this executor.
- * <li>total_tasks: gauge. how many tasks are submitted to this executor.
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
- * executing.
- * </ul>
- */
-public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
- static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class);
-
- private class TimedRunnable implements Runnable {
-
- final Runnable runnable;
- final long enqueueNanos;
-
- TimedRunnable(Runnable runnable) {
- this.runnable = runnable;
- this.enqueueNanos = MathUtils.nowInNano();
- }
-
- @Override
- public void run() {
- long startNanos = MathUtils.nowInNano();
- long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
- taskPendingStats.registerSuccessfulEvent(pendingMicros);
- try {
- runnable.run();
- } finally {
- long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
- taskExecutionStats.registerSuccessfulEvent(executionMicros);
- }
- }
-
- @Override
- public String toString() {
- return runnable.toString();
- }
-
- @Override
- public int hashCode() {
- return runnable.hashCode();
- }
- }
-
- private class TimedCallable<T> implements Callable<T> {
-
- final Callable<T> task;
- final long enqueueNanos;
-
- TimedCallable(Callable<T> task) {
- this.task = task;
- this.enqueueNanos = MathUtils.nowInNano();
- }
-
- @Override
- public T call() throws Exception {
- long startNanos = MathUtils.nowInNano();
- long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
- taskPendingStats.registerSuccessfulEvent(pendingMicros);
- try {
- return task.call();
- } finally {
- long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
- taskExecutionStats.registerSuccessfulEvent(executionMicros);
- }
- }
- }
-
- protected final boolean traceTaskExecution;
- protected final OpStatsLogger taskExecutionStats;
- protected final OpStatsLogger taskPendingStats;
- protected final StatsLogger statsLogger;
- // Gauges and their labels
- private static final String pendingTasksGaugeLabel = "pending_tasks";
- private final Gauge<Number> pendingTasksGauge;
- private static final String completedTasksGaugeLabel = "completed_tasks";
- protected final Gauge<Number> completedTasksGauge;
- private static final String totalTasksGaugeLabel = "total_tasks";
- protected final Gauge<Number> totalTasksGauge;
-
- public MonitoredScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory,
- StatsLogger statsLogger,
- boolean traceTaskExecution) {
- super(corePoolSize, threadFactory);
- this.traceTaskExecution = traceTaskExecution;
- this.statsLogger = statsLogger;
- this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time");
- this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time");
- this.pendingTasksGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getQueue().size();
- }
- };
- this.completedTasksGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getCompletedTaskCount();
- }
- };
- this.totalTasksGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getTaskCount();
- }
- };
-
- // outstanding tasks
- this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge);
- // completed tasks
- this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge);
- // total tasks
- this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge);
- }
-
- private Runnable timedRunnable(Runnable r) {
- return traceTaskExecution ? new TimedRunnable(r) : r;
- }
-
- private <T> Callable<T> timedCallable(Callable<T> task) {
- return traceTaskExecution ? new TimedCallable<T>(task) : task;
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- return super.submit(timedRunnable(task));
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return super.submit(timedRunnable(task), result);
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return super.submit(timedCallable(task));
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- Throwable hiddenThrowable = extractThrowable(r);
- if (hiddenThrowable != null)
- logAndHandle(hiddenThrowable, true);
-
- // The executor re-throws exceptions thrown by the task to the uncaught exception handler
- // so we don't need to pass the exception to the handler explicitly
- if (null != t) {
- logAndHandle(t, false);
- }
- }
-
- /**
- * The executor re-throws exceptions thrown by the task to the uncaught exception handler
- * so we only need to do anything if uncaught exception handler has not been se
- */
- private void logAndHandle(Throwable t, boolean passToHandler) {
- if (Thread.getDefaultUncaughtExceptionHandler() == null) {
- LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
- }
- else {
- LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
- if (passToHandler) {
- Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
- }
- }
- }
-
-
- /**
- * Extract the exception (throwable) inside the ScheduledFutureTask
- * @param runnable - The runable that was executed
- * @return exception enclosed in the Runnable if any; null otherwise
- */
- private Throwable extractThrowable(Runnable runnable) {
- // Check for exceptions wrapped by FutureTask.
- // We do this by calling get(), which will cause it to throw any saved exception.
- // Check for isDone to prevent blocking
- if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) {
- try {
- ((Future<?>) runnable).get();
- } catch (CancellationException e) {
- LOG.debug("Task {} cancelled", runnable, e.getCause());
- } catch (InterruptedException e) {
- LOG.debug("Task {} was interrupted", runnable, e);
- } catch (ExecutionException e) {
- return e.getCause();
- }
- }
-
- return null;
- }
-
- void unregisterGauges() {
- this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge);
- this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge);
- this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
deleted file mode 100644
index ad1ba4e..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-import com.twitter.util.TimerTask;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import scala.Function0;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing
- * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i>
- * will be executed in order.
- * <p>
- * The scheduler is comprised of multiple {@link MonitoredScheduledThreadPoolExecutor}s. Each
- * {@link MonitoredScheduledThreadPoolExecutor} is a single thread executor. Normal task submissions will
- * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g
- * {@link OrderedScheduler#apply(Object, Function0)} will be submitted to a dedicated executor based on
- * the hash value of submit <i>key</i>.
- *
- * <h3>Metrics</h3>
- *
- * <h4>Per Executor Metrics</h4>
- *
- * Metrics about individual executors are exposed via {@link Builder#perExecutorStatsLogger}
- * under <i>`scope`/`name`-executor-`id`-0</i>. `name` is the scheduler name provided by {@link Builder#name}
- * while `id` is the index of this executor in the pool. And corresponding stats of future pool of
- * that executor are exposed under <i>`scope`/`name`-executor-`id`-0/futurepool</i>.
- * <p>
- * See {@link MonitoredScheduledThreadPoolExecutor} and {@link MonitoredFuturePool} for per executor metrics
- * exposed.
- *
- * <h4>Aggregated Metrics</h4>
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
- * executing.
- * <li>futurepool/task_pending_time: opstats. measuring the characteristics about the time that tasks spent
- * on waiting in future pool being executed.
- * <li>futurepool/task_execution_time: opstats. measuring the characteristics about the time that tasks spent
- * on executing.
- * <li>futurepool/task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on
- * submitting to future pool.
- * <li>futurepool/tasks_pending: gauge. how many tasks are pending in this future pool.
- * </ul>
- */
-public class OrderedScheduler implements ScheduledExecutorService {
-
- /**
- * Create a builder to build scheduler.
- *
- * @return scheduler builder
- */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Builder for {@link OrderedScheduler}.
- */
- public static class Builder {
-
- private String name = "OrderedScheduler";
- private int corePoolSize = -1;
- private ThreadFactory threadFactory = null;
- private boolean traceTaskExecution = false;
- private long traceTaskExecutionWarnTimeUs = Long.MAX_VALUE;
- private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
- private StatsLogger perExecutorStatsLogger = NullStatsLogger.INSTANCE;
-
- /**
- * Set the name of this scheduler. It would be used as part of stats scope and thread name.
- *
- * @param name
- * name of the scheduler.
- * @return scheduler builder
- */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Set the number of threads to be used in this scheduler.
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle
- * @return scheduler builder
- */
- public Builder corePoolSize(int corePoolSize) {
- this.corePoolSize = corePoolSize;
- return this;
- }
-
- /**
- * Set the thread factory that the scheduler uses to create a new thread.
- *
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- * @return scheduler builder
- */
- public Builder threadFactory(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- return this;
- }
-
- /**
- * Enable/Disable exposing task execution stats.
- *
- * @param trace
- * flag to enable/disable exposing task execution stats.
- * @return scheduler builder
- */
- public Builder traceTaskExecution(boolean trace) {
- this.traceTaskExecution = trace;
- return this;
- }
-
- /**
- * Enable/Disable logging slow tasks whose execution time is above <code>timeUs</code>.
- *
- * @param timeUs
- * slow task execution time threshold in us.
- * @return scheduler builder.
- */
- public Builder traceTaskExecutionWarnTimeUs(long timeUs) {
- this.traceTaskExecutionWarnTimeUs = timeUs;
- return this;
- }
-
- /**
- * Expose the aggregated stats over <code>statsLogger</code>.
- *
- * @param statsLogger
- * stats logger to receive aggregated stats.
- * @return scheduler builder
- */
- public Builder statsLogger(StatsLogger statsLogger) {
- this.statsLogger = statsLogger;
- return this;
- }
-
- /**
- * Expose stats of individual executors over <code>perExecutorStatsLogger</code>.
- * Each executor's stats will be exposed under a sub-scope `name`-executor-`id`-0.
- * `name` is the scheduler name, while `id` is the index of the scheduler in the pool.
- *
- * @param perExecutorStatsLogger
- * stats logger to receive per executor stats.
- * @return scheduler builder
- */
- public Builder perExecutorStatsLogger(StatsLogger perExecutorStatsLogger) {
- this.perExecutorStatsLogger = perExecutorStatsLogger;
- return this;
- }
-
- /**
- * Build the ordered scheduler.
- *
- * @return ordered scheduler
- */
- public OrderedScheduler build() {
- if (corePoolSize <= 0) {
- corePoolSize = Runtime.getRuntime().availableProcessors();
- }
- if (null == threadFactory) {
- threadFactory = Executors.defaultThreadFactory();
- }
-
- return new OrderedScheduler(
- name,
- corePoolSize,
- threadFactory,
- traceTaskExecution,
- traceTaskExecutionWarnTimeUs,
- statsLogger,
- perExecutorStatsLogger);
- }
-
- }
-
- protected final String name;
- protected final int corePoolSize;
- protected final MonitoredScheduledThreadPoolExecutor[] executors;
- protected final MonitoredFuturePool[] futurePools;
- protected final Random random;
-
- private OrderedScheduler(String name,
- int corePoolSize,
- ThreadFactory threadFactory,
- boolean traceTaskExecution,
- long traceTaskExecutionWarnTimeUs,
- StatsLogger statsLogger,
- StatsLogger perExecutorStatsLogger) {
- this.name = name;
- this.corePoolSize = corePoolSize;
- this.executors = new MonitoredScheduledThreadPoolExecutor[corePoolSize];
- this.futurePools = new MonitoredFuturePool[corePoolSize];
- for (int i = 0; i < corePoolSize; i++) {
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat(name + "-executor-" + i + "-%d")
- .setThreadFactory(threadFactory)
- .build();
- StatsLogger broadcastStatsLogger =
- BroadCastStatsLogger.masterslave(perExecutorStatsLogger.scope("executor-" + i), statsLogger);
- executors[i] = new MonitoredScheduledThreadPoolExecutor(
- 1, tf, broadcastStatsLogger, traceTaskExecution);
- futurePools[i] = new MonitoredFuturePool(
- new ExecutorServiceFuturePool(executors[i]),
- broadcastStatsLogger.scope("futurepool"),
- traceTaskExecution,
- traceTaskExecutionWarnTimeUs);
- }
- this.random = new Random(System.currentTimeMillis());
- }
-
- protected MonitoredScheduledThreadPoolExecutor chooseExecutor() {
- return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)];
- }
-
- protected MonitoredScheduledThreadPoolExecutor chooseExecutor(Object key) {
- return corePoolSize == 1 ? executors[0] :
- executors[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)];
- }
-
- protected FuturePool chooseFuturePool(Object key) {
- return corePoolSize == 1 ? futurePools[0] :
- futurePools[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)];
- }
-
- protected FuturePool chooseFuturePool() {
- return corePoolSize == 1 ? futurePools[0] : futurePools[random.nextInt(corePoolSize)];
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- return chooseExecutor().schedule(command, delay, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- return chooseExecutor().schedule(callable, delay, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay, long period, TimeUnit unit) {
- return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
- long initialDelay, long delay, TimeUnit unit) {
- return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void shutdown() {
- for (MonitoredScheduledThreadPoolExecutor executor : executors) {
- // Unregister gauges
- executor.unregisterGauges();
- executor.shutdown();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Runnable> shutdownNow() {
- List<Runnable> runnables = new ArrayList<Runnable>();
- for (MonitoredScheduledThreadPoolExecutor executor : executors) {
- runnables.addAll(executor.shutdownNow());
- }
- return runnables;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isShutdown() {
- for (MonitoredScheduledThreadPoolExecutor executor : executors) {
- if (!executor.isShutdown()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isTerminated() {
- for (MonitoredScheduledThreadPoolExecutor executor : executors) {
- if (!executor.isTerminated()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- for (MonitoredScheduledThreadPoolExecutor executor : executors) {
- if (!executor.awaitTermination(timeout, unit)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return chooseExecutor().submit(task);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return chooseExecutor().submit(task, result);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Future<?> submit(Runnable task) {
- return chooseExecutor().submit(task);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- return chooseExecutor().invokeAll(tasks);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException {
- return chooseExecutor().invokeAll(tasks, timeout, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- return chooseExecutor().invokeAny(tasks);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return chooseExecutor().invokeAny(tasks, timeout, unit);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void execute(Runnable command) {
- chooseExecutor().execute(command);
- }
-
- // Ordered Functions
-
- /**
- * Return a future pool used by <code>key</code>.
- *
- * @param key
- * key to order in the future pool
- * @return future pool
- */
- public FuturePool getFuturePool(Object key) {
- return chooseFuturePool(key);
- }
-
- /**
- * Execute the <code>function</code> in the executor that assigned by <code>key</code>.
- *
- * @see com.twitter.util.Future
- * @param key key of the <i>function</i> to run
- * @param function function to run
- * @return future representing the result of the <i>function</i>
- */
- public <T> com.twitter.util.Future<T> apply(Object key, Function0<T> function) {
- return chooseFuturePool(key).apply(function);
- }
-
- /**
- * Execute the <code>function</code> by the scheduler. It would be submitted to any executor randomly.
- *
- * @param function function to run
- * @return future representing the result of the <i>function</i>
- */
- public <T> com.twitter.util.Future<T> apply(Function0<T> function) {
- return chooseFuturePool().apply(function);
- }
-
- public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) {
- return chooseExecutor(key).schedule(command, delay, unit);
- }
-
- public ScheduledFuture<?> scheduleAtFixedRate(Object key,
- Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit) {
- return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit);
- }
-
- public Future<?> submit(Object key, Runnable command) {
- return chooseExecutor(key).submit(command);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java
deleted file mode 100644
index 15394dc..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-/**
- * A simple limiter interface which tracks acquire/release of permits, for
- * example for tracking outstanding writes.
- */
-public interface PermitLimiter {
-
- public static PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() {
- @Override
- public boolean acquire() {
- return true;
- }
- @Override
- public void release(int permits) {
- }
-
- @Override
- public void close() {
-
- }
- };
-
- /**
- * Acquire a permit.
- *
- * @return true if successfully acquire a permit, otherwise false.
- */
- boolean acquire();
-
- /**
- * Release a permit.
- */
- void release(int permits);
-
- /**
- * Close the resources created by the limiter
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java
deleted file mode 100644
index 24c7860..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-public interface PermitManager {
-
- public static interface Permit {
- static final Permit ALLOWED = new Permit() {
- @Override
- public boolean isAllowed() {
- return true;
- }
- };
- boolean isAllowed();
- }
-
- public static PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() {
- @Override
- public Permit acquirePermit() {
- return Permit.ALLOWED;
- }
-
- @Override
- public void releasePermit(Permit permit) {
- // nop
- }
-
- @Override
- public boolean allowObtainPermits() {
- return true;
- }
-
- @Override
- public boolean disallowObtainPermits(Permit permit) {
- return false;
- }
-
- @Override
- public void close() {
- // nop
- }
-
- };
-
- /**
- * Obetain a permit from permit manager.
- *
- * @return permit.
- */
- Permit acquirePermit();
-
- /**
- * Release a given permit.
- *
- * @param permit
- * permit to release
- */
- void releasePermit(Permit permit);
-
- /**
- * Allow obtaining permits.
- */
- boolean allowObtainPermits();
-
- /**
- * Disallow obtaining permits. Disallow needs to be performed under the context
- * of <i>permit</i>.
- *
- * @param permit
- * permit context to disallow
- */
- boolean disallowObtainPermits(Permit permit);
-
- /**
- * Release the resources
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java
deleted file mode 100644
index a467d26..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.util.Function0;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.runtime.BoxedUnit;
-
-/**
- * Acts like a future pool, but collects failed apply calls into a queue to be applied
- * in-order on close. This happens either in the close thread or after close is called,
- * in the last operation to complete execution.
- * Ops submitted after close will not be scheduled, so its important to ensure no more
- * ops will be applied once close has been called.
- */
-public class SafeQueueingFuturePool<T> {
-
- static final Logger LOG = LoggerFactory.getLogger(SafeQueueingFuturePool.class);
-
- private boolean closed;
- private int outstanding;
- private ConcurrentLinkedQueue<Function0<T>> queue;
- private FuturePool orderedFuturePool;
-
- public SafeQueueingFuturePool(FuturePool orderedFuturePool) {
- this.closed = false;
- this.outstanding = 0;
- this.queue = new ConcurrentLinkedQueue<Function0<T>>();
- this.orderedFuturePool = orderedFuturePool;
- }
-
- public synchronized Future<T> apply(final Function0<T> fn) {
- Preconditions.checkNotNull(fn);
- if (closed) {
- return Future.exception(new RejectedExecutionException("Operation submitted to closed SafeQueueingFuturePool"));
- }
- ++outstanding;
- queue.add(fn);
- Future<T> result = orderedFuturePool.apply(new Function0<T>() {
- @Override
- public T apply() {
- return queue.poll().apply();
- }
- @Override
- public String toString() {
- return fn.toString();
- }
- }).ensure(new Function0<BoxedUnit>() {
- public BoxedUnit apply() {
- if (decrOutstandingAndCheckDone()) {
- applyAll();
- }
- return null;
- }
- });
- return result;
- }
-
- private synchronized boolean decrOutstandingAndCheckDone() {
- return --outstanding == 0 && closed;
- }
-
- public void close() {
- final boolean done;
- synchronized (this) {
- if (closed) {
- return;
- }
- closed = true;
- done = (outstanding == 0);
- }
- if (done) {
- applyAll();
- }
- }
-
- private void applyAll() {
- if (!queue.isEmpty()) {
- LOG.info("Applying {} items", queue.size());
- }
- while (!queue.isEmpty()) {
- queue.poll().apply();
- }
- }
-
- public synchronized int size() {
- return queue.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java
deleted file mode 100644
index 66e382c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class SchedulerUtils {
-
- static final Logger logger = LoggerFactory.getLogger(SchedulerUtils.class);
-
- public static void shutdownScheduler(ExecutorService service, long timeout, TimeUnit timeUnit) {
- if (null == service) {
- return;
- }
- service.shutdown();
- try {
- service.awaitTermination(timeout, timeUnit);
- } catch (InterruptedException e) {
- logger.warn("Interrupted when shutting down scheduler : ", e);
- }
- service.shutdownNow();
- }
-
- public static void shutdownScheduler(OrderedSafeExecutor service, long timeout, TimeUnit timeUnit) {
- if (null == service) {
- return;
- }
- service.shutdown();
- try {
- service.awaitTermination(timeout, timeUnit);
- } catch (InterruptedException e) {
- logger.warn("Interrupted when shutting down scheduler : ", e);
- }
- service.forceShutdown(timeout, timeUnit);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java
deleted file mode 100644
index ab8de35..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-/**
- * Sequencer generating transaction id.
- */
-public interface Sequencer {
-
- /**
- * Return next transaction id generated by the sequencer.
- *
- * @return next transaction id generated by the sequencer.
- */
- long nextId();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index 767ddf6..3697b3f 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,8 +25,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.distributedlog.common.util.PermitLimiter;
/**
* Simple counter based {@link PermitLimiter}.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java
deleted file mode 100644
index 2f606e2..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-/**
- * The {@code Sizable} interface is to provide the capability of calculating size
- * of any objects.
- */
-public interface Sizable {
- /**
- * Calculate the size for this instance.
- *
- * @return size of the instance.
- */
- long size();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
index 69dfdbe..5bc5af2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog.util;
import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.common.util.Sequencer;
/**
* Time based sequencer. It generated non-decreasing transaction id using milliseconds.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
index 3a623dc..d90a7f8 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,7 +18,7 @@
package org.apache.distributedlog.util;
import com.google.common.annotations.Beta;
-import com.twitter.util.Future;
+import java.util.concurrent.CompletableFuture;
/**
* Util class represents a transaction
@@ -44,7 +44,7 @@ public interface Transaction<OpResult> {
}
/**
- * Listener on the result of an {@link org.apache.distributedlog.util.Transaction.Op}.
+ * Listener on the result of an {@link Transaction.Op}.
*
* @param <OpResult>
*/
@@ -77,12 +77,12 @@ public interface Transaction<OpResult> {
/**
* Execute the current transaction. If the transaction succeed, all operations will be
- * committed (via {@link org.apache.distributedlog.util.Transaction.Op#commit(Object)}.
+ * committed (via {@link Transaction.Op#commit(Object)}.
* Otherwise, all operations will be aborted (via {@link Op#abort(Throwable, Object)}).
*
* @return future representing the result of transaction execution.
*/
- Future<Void> execute();
+ CompletableFuture<Void> execute();
/**
* Abort current transaction. If this is called and the transaction haven't been executed by