You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/19 22:49:52 UTC

[GitHub] starpit closed pull request #3115: remove drag from PrimitiveActions, LoadBalancerService, and Scheduler

starpit closed pull request #3115: remove drag from PrimitiveActions, LoadBalancerService, and Scheduler
URL: https://github.com/apache/incubator-openwhisk/pull/3115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala b/common/scala/src/main/scala/whisk/common/Scheduler.scala
index c4bceeda96..e55fbf9032 100644
--- a/common/scala/src/main/scala/whisk/common/Scheduler.scala
+++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala
@@ -17,6 +17,7 @@
 
 package whisk.common
 
+import java.util.concurrent.atomic.AtomicReference
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
@@ -53,6 +54,8 @@ object Scheduler {
 
     var lastSchedule: Option[Cancellable] = None
 
+    val cref = new AtomicReference(closure)
+
     override def preStart() = {
       if (initialDelay != Duration.Zero) {
         lastSchedule = Some(context.system.scheduler.scheduleOnce(initialDelay, self, ScheduledWork))
@@ -62,24 +65,32 @@ object Scheduler {
     }
     override def postStop() = {
       logging.info(this, s"$name shutdown")
+      cref.set(null)
       lastSchedule.foreach(_.cancel())
     }
 
     def receive = {
-      case WorkOnceNow => Try(closure())
+      case WorkOnceNow =>
+        val closure = cref.getAndSet(null)
+        if (closure != null) {
+          Try(closure())
+        }
 
       case ScheduledWork =>
         val deadline = interval.fromNow
-        Try(closure()) match {
-          case Success(result) =>
-            result onComplete { _ =>
-              val timeToWait = if (alwaysWait) interval else deadline.timeLeft.max(Duration.Zero)
-              // context might be null here if a PoisonPill is sent while doing computations
-              lastSchedule = Option(context).map(_.system.scheduler.scheduleOnce(timeToWait, self, ScheduledWork))
-            }
+        val closure = cref.getAndSet(null)
+        if (closure != null) {
+          Try(closure()) match {
+            case Success(result) =>
+              result onComplete { _ =>
+                val timeToWait = if (alwaysWait) interval else deadline.timeLeft.max(Duration.Zero)
+                // context might be null here if a PoisonPill is sent while doing computations
+                lastSchedule = Option(context).map(_.system.scheduler.scheduleOnce(timeToWait, self, ScheduledWork))
+              }
 
-          case Failure(e) =>
-            logging.error(name, s"halted because ${e.getMessage}")
+            case Failure(e) =>
+              logging.error(name, s"halted because ${e.getMessage}")
+          }
         }
     }
   }
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f604e63b33..6d49e407ab 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,6 +17,7 @@
 
 package whisk.core.controller.actions
 
+import java.util.concurrent.atomic.AtomicReference
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
@@ -166,16 +167,23 @@ protected[actions] trait PrimitiveActions {
 
     logging.info(this, s"action activation will block for result upto $totalWaitTime")
 
+    val finisherRef = new AtomicReference(finisher)
     activeAckResponse map {
       case result @ Right(_) =>
         // activation complete, result is available
-        finisher ! ActivationFinisher.Finish(result)
+        val finisher = finisherRef.getAndSet(null)
+        if (finisher != null) {
+           finisher ! ActivationFinisher.Finish(result)
+        }
 
       case _ =>
         // active ack received but it does not carry the response,
         // no result available except by polling the db
         logging.warn(this, "pre-emptively polling db because active ack is missing result")
-        finisher ! Scheduler.WorkOnceNow
+        val finisher = finisherRef.getAndSet(null)
+        if (finisher != null) {
+           finisher ! Scheduler.WorkOnceNow
+        }
     }
 
     // return the promise which is either fulfilled by active ack, polling from the database,
@@ -185,7 +193,11 @@ protected[actions] trait PrimitiveActions {
       totalWaitTime, {
         Future.successful(Left(activationId)).andThen {
           // result no longer interesting; terminate the finisher/shut down db polling if necessary
-          case _ => actorSystem.stop(finisher)
+          case _ =>
+             val finisher = finisherRef.getAndSet(null)
+             if (finisher != null) {
+                actorSystem.stop(finisher)
+             }
         }
       })
   }
@@ -196,6 +208,7 @@ protected[actions] object ActivationFinisher {
   case class Finish(activation: Right[ActivationId, WhiskActivation])
 
   private type ActivationLookup = () => Future[WhiskActivation]
+  private type ActivationPromise = AtomicReference[Promise[Either[ActivationId, WhiskActivation]]]
 
   /** Periodically polls the db to cover missing active acks. */
   private val datastorePollPeriodForActivation = 15.seconds
@@ -214,7 +227,7 @@ protected[actions] object ActivationFinisher {
     logging: Logging): (Future[Either[ActivationId, WhiskActivation]], ActorRef) = {
 
     val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, datastorePreemptivePolling)
-    (p.future, f) // hides the polling actor
+    (p.get.future, f) // hides the polling actor
   }
 
   /**
@@ -227,10 +240,10 @@ protected[actions] object ActivationFinisher {
     implicit transid: TransactionId,
     actorSystem: ActorSystem,
     executionContext: ExecutionContext,
-    logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], ActorRef, ActorRef) = {
+    logging: Logging): (ActivationPromise, ActorRef, ActorRef) = {
 
     // this is strictly completed by the finishing actor
-    val promise = Promise[Either[ActivationId, WhiskActivation]]
+    val promise = new AtomicReference(Promise[Either[ActivationId, WhiskActivation]])
     val dbpoller = poller(slowPoll, promise, activationLookup)
     val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise))
 
@@ -247,7 +260,7 @@ protected[actions] object ActivationFinisher {
    */
   private class ActivationFinisher(poller: ActorRef, // the activation poller
                                    fastPollPeriods: Seq[FiniteDuration],
-                                   promise: Promise[Either[ActivationId, WhiskActivation]])(
+                                   prom: ActivationPromise)(
     implicit transid: TransactionId,
     actorSystem: ActorSystem,
     executionContext: ExecutionContext,
@@ -255,13 +268,16 @@ protected[actions] object ActivationFinisher {
       extends Actor {
 
     // when the future completes, self-destruct
-    promise.future.andThen { case _ => shutdown() }
+    prom.get.future.andThen { case _ => shutdown() }
 
     var preemptiveMsgs = Vector.empty[Cancellable]
 
     def receive = {
       case ActivationFinisher.Finish(activation) =>
-        promise.trySuccess(activation)
+        val promise = prom.getAndSet(null)
+        if (promise != null) {
+           promise.trySuccess(activation)
+        }
 
       case msg @ Scheduler.WorkOnceNow =>
         // try up to three times when pre-emptying the schedule
@@ -271,6 +287,7 @@ protected[actions] object ActivationFinisher {
     }
 
     def shutdown(): Unit = {
+      prom.set(null) // just in case
       preemptiveMsgs.foreach(_.cancel())
       preemptiveMsgs = Vector.empty
       context.stop(poller)
@@ -279,6 +296,7 @@ protected[actions] object ActivationFinisher {
 
     override def postStop() = {
       logging.info(this, "finisher shutdown")
+      prom.set(null) // just in case
       preemptiveMsgs.foreach(_.cancel())
       preemptiveMsgs = Vector.empty
       context.stop(poller)
@@ -290,13 +308,14 @@ protected[actions] object ActivationFinisher {
    * It is a factory method to facilitate testing.
    */
   private def poller(slowPollPeriod: FiniteDuration,
-                     promise: Promise[Either[ActivationId, WhiskActivation]],
+                     prom: ActivationPromise,
                      activationLookup: ActivationLookup)(implicit transid: TransactionId,
                                                          actorSystem: ActorSystem,
                                                          executionContext: ExecutionContext,
                                                          logging: Logging): ActorRef = {
     Scheduler.scheduleWaitAtMost(slowPollPeriod, initialDelay = slowPollPeriod, name = "dbpoll")(() => {
-      if (!promise.isCompleted) {
+      val promise = prom.getAndSet(null)
+      if (promise != null && !promise.isCompleted) {
         activationLookup() map {
           // complete the future, which in turn will poison pill this scheduler
           activation =>
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
index 34b5d6708f..ac646e549f 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
@@ -85,6 +85,6 @@ class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Lo
   }
 
   def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
+    activationsById.get(aid).flatMap(entry => removeActivation(entry))
   }
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 1866d2dcba..a523260bb9 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -19,11 +19,14 @@ package whisk.core.loadBalancer
 
 import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
 
+import akka.actor.Cancellable
 import scala.concurrent.{Future, Promise}
 
+// please note: the timeoutHandler *must* be .cancel()'d on any non-timeout paths, else memory drags
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InstanceId,
+                           timeoutHandler: Cancellable,
                            promise: Promise[Either[ActivationId, WhiskActivation]])
 trait LoadBalancerData {
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 17d8796dca..1a2aaf3ab6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -154,6 +154,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
         // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
         invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
         if (!forced) {
+          entry.timeoutHandler.cancel()
           entry.promise.trySuccess(response)
         } else {
           entry.promise.tryFailure(new Throwable("no active ack received"))
@@ -188,11 +189,12 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
     // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
     loadBalancerData.putActivation(activationId, {
-      actorSystem.scheduler.scheduleOnce(timeout) {
+      val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
         processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
       }
 
-      ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
+      // please note: the timeoutHandler *must* be .cancel()'d on any non-timeout paths, else memory drags
+      ActivationEntry(activationId, namespaceId, invokerName, timeoutHandler, Promise[Either[ActivationId, WhiskActivation]]())
     })
   }
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
index 92e3789e76..8ee7f5d17f 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
@@ -71,6 +71,6 @@ class LocalLoadBalancerData() extends LoadBalancerData {
   }
 
   override def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
+    activationsById.get(aid).flatMap(entry => removeActivation(entry))
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services