You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/01/02 07:49:24 UTC

[GitHub] markusthoemmes commented on a change in pull request #3129: fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout

markusthoemmes commented on a change in pull request #3129: fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout
URL: https://github.com/apache/incubator-openwhisk/pull/3129#discussion_r159182293
 
 

 ##########
 File path: common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
 ##########
 @@ -23,38 +23,61 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
+import scala.util.control.NonFatal
 
 import akka.actor.ActorSystem
-import akka.pattern.{after => expire}
+import akka.actor.Cancellable
+import akka.actor.Scheduler
 
 object ExecutionContextFactory {
 
-  // Future.firstCompletedOf has a memory drag bug
-  // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
-  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+  private type CancellableFuture[T] = (Cancellable, Future[T])
+
+  /**
+   * akka.pattern.after has a memory drag issue: it opaquely
+   * schedules an actor which consequently results in drag for the
+   * timeout duration
+   *
+   */
+  def expire[T](duration: FiniteDuration, using: Scheduler)(value: ? Future[T])(
+    implicit ec: ExecutionContext): CancellableFuture[T] = {
     val p = Promise[T]()
-    val pref = new java.util.concurrent.atomic.AtomicReference(p)
-    val completeFirst: Try[T] => Unit = { result: Try[T] =>
-      val promise = pref.getAndSet(null)
-      if (promise != null) {
-        promise.tryComplete(result)
+    val cancellable = using.scheduleOnce(duration) {
+      p completeWith {
+        try value
+        catch { case NonFatal(t) ? Future.failed(t) }
       }
     }
-    futures foreach { _ onComplete completeFirst }
+    (cancellable, p.future)
+  }
+
+  /**
+   * Return the first of the two given futures to complete; if f1
+   * finishes first, we will cancel f2
+   *
+   */
+  def firstCompletedOf2[T](f1: Future[T], f2: CancellableFuture[T])(implicit executor: ExecutionContext): Future[T] = {
+    val p = Promise[T]()
+
+    f1 onComplete { result =>
+      p.tryComplete(result)
+      f2._1.cancel()
+    }
+    f2._2 onComplete p.tryComplete
 
 Review comment:
   Could use some destructuring for ease of reading:
   
   ```scala
   val (f2Killswitch, f2) = f2Cancellable
   
   f1.onComplete { result =>
     p.tryComplete(result)
     f2Killswitch.cancel()
   }
   
   f2.onComplete(p.tryComplete)
   ```
   
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services