You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:22 UTC
[08/31] incubator-distributedlog git commit: DL-119: Fix the logging
on closing readahead worker
DL-119: Fix the logging on closing readahead worker
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/34fa16b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/34fa16b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/34fa16b1
Branch: refs/heads/master
Commit: 34fa16b1d7ab1ae887b1114e8e2aabdffe16608a
Parents: dc4548b
Author: Yiming Zang <yz...@twitter.com>
Authored: Wed Nov 30 18:21:08 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:09:40 2016 -0800
----------------------------------------------------------------------
.../distributedlog/util/FutureUtils.java | 21 ++++++++++++++++----
.../MonitoredScheduledThreadPoolExecutor.java | 4 ++--
2 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/34fa16b1/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index 6a647a9..266409e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -35,11 +35,13 @@ import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.Return;
import com.twitter.util.Throw;
+import com.twitter.util.Try;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import javax.annotation.Nullable;
@@ -384,14 +386,25 @@ public class FutureUtils {
if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
return promise;
}
- scheduler.schedule(key, new Runnable() {
+ // schedule a timeout to raise timeout exception
+ final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
@Override
public void run() {
- logger.info("Raise exception", cause);
- // satisfy the promise
- FutureUtils.setException(promise, cause);
+ if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
+ logger.info("Raise exception", cause);
+ }
}
}, timeout, unit);
+ // when the promise is satisfied, cancel the timeout task
+ promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Try<T> value) {
+ if (!task.cancel(true)) {
+ logger.debug("Failed to cancel the timeout task");
+ }
+ return BoxedUnit.UNIT;
+ }
+ });
return promise;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/34fa16b1/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
index 512a456..75223f2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
@@ -237,9 +237,9 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
try {
((Future<?>) runnable).get();
} catch (CancellationException e) {
- LOG.info("Task {} cancelled", runnable, e.getCause());
+ LOG.debug("Task {} cancelled", runnable, e.getCause());
} catch (InterruptedException e) {
- LOG.info("Task {} was interrupted", runnable, e);
+ LOG.debug("Task {} was interrupted", runnable, e);
} catch (ExecutionException e) {
return e.getCause();
}