You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/21 22:47:36 UTC

[GitHub] starpit closed pull request #3127: fix PrimitiveActions drag of WhiskActivations

starpit closed pull request #3127: fix PrimitiveActions drag of WhiskActivations
URL: https://github.com/apache/incubator-openwhisk/pull/3127
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index 88764d3f52..c8da25ed15 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -23,38 +23,61 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
+import scala.util.control.NonFatal
 
 import akka.actor.ActorSystem
-import akka.pattern.{after => expire}
+import akka.actor.Cancellable
+import akka.actor.Scheduler
 
 object ExecutionContextFactory {
 
-  // Future.firstCompletedOf has a memory drag bug
-  // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
-  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+  private type CancellableFuture[T] = (Cancellable, Future[T])
+
+  /**
+   * akka.pattern.after has a memory drag issue: it opaquely
+   * schedules an actor which consequently results in drag for the
+   * timeout duration
+   *
+   */
+  def expire[T](duration: FiniteDuration, using: Scheduler)(value: ? Future[T])(
+    implicit ec: ExecutionContext): CancellableFuture[T] = {
     val p = Promise[T]()
-    val pref = new java.util.concurrent.atomic.AtomicReference(p)
-    val completeFirst: Try[T] => Unit = { result: Try[T] =>
-      val promise = pref.getAndSet(null)
-      if (promise != null) {
-        promise.tryComplete(result)
+    val cancellable = using.scheduleOnce(duration) {
+      p completeWith {
+        try value
+        catch { case NonFatal(t) ? Future.failed(t) }
       }
     }
-    futures foreach { _ onComplete completeFirst }
+    (cancellable, p.future)
+  }
+
+  /**
+   * Return the first of the two given futures to complete; if f1
+   * finishes first, we will cancel f2
+   *
+   */
+  def firstCompletedOf2[T](f1: Future[T], f2: CancellableFuture[T])(implicit executor: ExecutionContext): Future[T] = {
+    val p = Promise[T]()
+
+    f1 onComplete { result =>
+      p.tryComplete(result)
+      f2._1.cancel()
+    }
+    f2._2 onComplete p.tryComplete
+
     p.future
   }
 
   implicit class FutureExtensions[T](f: Future[T]) {
     def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
       implicit val ec = system.dispatcher
-      firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))))
+      firstCompletedOf2(f, expire(timeout, system.scheduler)(Future.failed(msg)))
     }
 
     def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => Future[T])(
       implicit system: ActorSystem): Future[T] = {
       implicit val ec = system.dispatcher
-      firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt)))
+      firstCompletedOf2(f, expire(timeout, system.scheduler)(alt))
     }
   }
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 1866d2dcba..0018cbbd8b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -19,11 +19,14 @@ package whisk.core.loadBalancer
 
 import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
 
+import akka.actor.Cancellable
 import scala.concurrent.{Future, Promise}
 
+// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InstanceId,
+                           timeoutHandler: Cancellable,
                            promise: Promise[Either[ActivationId, WhiskActivation]])
 trait LoadBalancerData {
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 17d8796dca..29a169fe7a 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -154,6 +154,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
         // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
         invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
         if (!forced) {
+          entry.timeoutHandler.cancel()
           entry.promise.trySuccess(response)
         } else {
           entry.promise.tryFailure(new Throwable("no active ack received"))
@@ -187,13 +188,20 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     // in this case, if the activation handler is still registered, remove it and update the books.
     // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
     // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
-    loadBalancerData.putActivation(activationId, {
-      actorSystem.scheduler.scheduleOnce(timeout) {
-        processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
-      }
+    loadBalancerData.putActivation(
+      activationId, {
+        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+          processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
+        }
 
-      ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
-    })
+        // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
+        ActivationEntry(
+          activationId,
+          namespaceId,
+          invokerName,
+          timeoutHandler,
+          Promise[Either[ActivationId, WhiskActivation]]())
+      })
   }
 
   /**
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index af511021de..9afa67f575 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -18,6 +18,7 @@
 package whisk.core.loadBalancer.test
 
 import akka.actor.ActorSystem
+import akka.actor.Cancellable
 import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
 import common.StreamLogging
 import org.scalatest.{FlatSpec, Matchers}
@@ -30,10 +31,16 @@ import whisk.core.entity.InstanceId
 import scala.concurrent.duration._
 
 class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
+  final val emptyCancellable: Cancellable = new Cancellable {
+    def isCancelled = false
+    def cancel() = true
+  }
 
   val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
-  val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise)
-  val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
+  val firstEntry: ActivationEntry =
+    ActivationEntry(ActivationId(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise)
+  val secondEntry: ActivationEntry =
+    ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
 
   val port = 2552
   val host = "127.0.0.1"
@@ -149,7 +156,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
   it should "respond with different values accordingly" in {
 
-    val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
+    val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
     val entrySameInvokerAndNamespace = entry.copy(id = ActivationId())
     val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID())
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services