You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/01/02 22:40:30 UTC

[incubator-openwhisk] branch master updated: fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1293647  fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129)
1293647 is described below

commit 1293647f96829df4c0ea90e815dc2b6fc664de11
Author: Nick Mitchell <st...@users.noreply.github.com>
AuthorDate: Tue Jan 2 17:40:27 2018 -0500

    fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129)
---
 .../whisk/utils/ExecutionContextFactory.scala      | 51 ++++++++++++++++------
 1 file changed, 38 insertions(+), 13 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index 88764d3..ea025cb 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -23,38 +23,63 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
+import scala.util.control.NonFatal
 
 import akka.actor.ActorSystem
-import akka.pattern.{after => expire}
+import akka.actor.Cancellable
+import akka.actor.Scheduler
 
 object ExecutionContextFactory {
 
-  // Future.firstCompletedOf has a memory drag bug
-  // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
-  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+  private type CancellableFuture[T] = (Cancellable, Future[T])
+
+  /**
+   * akka.pattern.after has a memory drag issue: it opaquely
+   * schedules an actor which consequently results in drag for the
+   * timeout duration
+   *
+   */
+  def expire[T](duration: FiniteDuration, using: Scheduler)(value: ⇒ Future[T])(
+    implicit ec: ExecutionContext): CancellableFuture[T] = {
     val p = Promise[T]()
-    val pref = new java.util.concurrent.atomic.AtomicReference(p)
-    val completeFirst: Try[T] => Unit = { result: Try[T] =>
-      val promise = pref.getAndSet(null)
-      if (promise != null) {
-        promise.tryComplete(result)
+    val cancellable = using.scheduleOnce(duration) {
+      p completeWith {
+        try value
+        catch { case NonFatal(t) ⇒ Future.failed(t) }
       }
     }
-    futures foreach { _ onComplete completeFirst }
+    (cancellable, p.future)
+  }
+
+  /**
+   * Return the first of the two given futures to complete; if f1
+   * finishes first, we will cancel f2
+   *
+   */
+  def firstCompletedOf2[T](f1: Future[T], f2Cancellable: CancellableFuture[T])(
+    implicit executor: ExecutionContext): Future[T] = {
+    val p = Promise[T]()
+    val (f2Killswitch, f2) = f2Cancellable
+
+    f1.onComplete { result =>
+      p.tryComplete(result)
+      f2Killswitch.cancel()
+    }
+    f2.onComplete(p.tryComplete)
+
     p.future
   }
 
   implicit class FutureExtensions[T](f: Future[T]) {
     def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
       implicit val ec = system.dispatcher
-      firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))))
+      firstCompletedOf2(f, expire(timeout, system.scheduler)(Future.failed(msg)))
     }
 
     def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => Future[T])(
       implicit system: ActorSystem): Future[T] = {
       implicit val ec = system.dispatcher
-      firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt)))
+      firstCompletedOf2(f, expire(timeout, system.scheduler)(alt))
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].