You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:22 UTC

[08/31] incubator-distributedlog git commit: DL-119: Fix the logging on closing readahead worker

DL-119: Fix the logging on closing readahead worker


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/34fa16b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/34fa16b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/34fa16b1

Branch: refs/heads/master
Commit: 34fa16b1d7ab1ae887b1114e8e2aabdffe16608a
Parents: dc4548b
Author: Yiming Zang <yz...@twitter.com>
Authored: Wed Nov 30 18:21:08 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:09:40 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/util/FutureUtils.java        | 21 ++++++++++++++++----
 .../MonitoredScheduledThreadPoolExecutor.java   |  4 ++--
 2 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/34fa16b1/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index 6a647a9..266409e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -35,11 +35,13 @@ import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
 import com.twitter.util.Return;
 import com.twitter.util.Throw;
+import com.twitter.util.Try;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
 import scala.runtime.BoxedUnit;
 
 import javax.annotation.Nullable;
@@ -384,14 +386,25 @@ public class FutureUtils {
         if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
             return promise;
         }
-        scheduler.schedule(key, new Runnable() {
+        // schedule a timeout to raise timeout exception
+        final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
             @Override
             public void run() {
-                logger.info("Raise exception", cause);
-                // satisfy the promise
-                FutureUtils.setException(promise, cause);
+                if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
+                    logger.info("Raise exception", cause);
+                }
             }
         }, timeout, unit);
+        // when the promise is satisfied, cancel the timeout task
+        promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Try<T> value) {
+                if (!task.cancel(true)) {
+                    logger.debug("Failed to cancel the timeout task");
+                }
+                return BoxedUnit.UNIT;
+            }
+        });
         return promise;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/34fa16b1/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
index 512a456..75223f2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
@@ -237,9 +237,9 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
             try {
                 ((Future<?>) runnable).get();
             } catch (CancellationException e) {
-                LOG.info("Task {} cancelled", runnable, e.getCause());
+                LOG.debug("Task {} cancelled", runnable, e.getCause());
             } catch (InterruptedException e) {
-                LOG.info("Task {} was interrupted", runnable, e);
+                LOG.debug("Task {} was interrupted", runnable, e);
             } catch (ExecutionException e) {
                 return e.getCause();
             }