You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/12/21 21:28:50 UTC

[incubator-openwhisk] branch master updated: make sure to cancel the timeout handler, in LoadBalancerService (#3118)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e90267  make sure to cancel the timeout handler, in LoadBalancerService (#3118)
1e90267 is described below

commit 1e902675bcf6160af8d72d8251739477162f8d76
Author: Nick Mitchell <st...@users.noreply.github.com>
AuthorDate: Thu Dec 21 16:28:47 2017 -0500

    make sure to cancel the timeout handler, in LoadBalancerService (#3118)
    
    Make sure to cancel the timeout handler on the active ack when a response is received. This reduces memory drag.
---
 .../whisk/core/loadBalancer/LoadBalancerData.scala   |  3 +++
 .../core/loadBalancer/LoadBalancerService.scala      | 20 ++++++++++++++------
 .../loadBalancer/test/LoadBalancerDataTests.scala    | 13 ++++++++++---
 3 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 1866d2d..0018cbb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -19,11 +19,14 @@ package whisk.core.loadBalancer
 
 import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
 
+import akka.actor.Cancellable
 import scala.concurrent.{Future, Promise}
 
+// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InstanceId,
+                           timeoutHandler: Cancellable,
                            promise: Promise[Either[ActivationId, WhiskActivation]])
 trait LoadBalancerData {
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 17d8796..29a169f 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -154,6 +154,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
         // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
         invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
         if (!forced) {
+          entry.timeoutHandler.cancel()
           entry.promise.trySuccess(response)
         } else {
           entry.promise.tryFailure(new Throwable("no active ack received"))
@@ -187,13 +188,20 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     // in this case, if the activation handler is still registered, remove it and update the books.
     // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
     // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
-    loadBalancerData.putActivation(activationId, {
-      actorSystem.scheduler.scheduleOnce(timeout) {
-        processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
-      }
+    loadBalancerData.putActivation(
+      activationId, {
+        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+          processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
+        }
 
-      ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
-    })
+        // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
+        ActivationEntry(
+          activationId,
+          namespaceId,
+          invokerName,
+          timeoutHandler,
+          Promise[Either[ActivationId, WhiskActivation]]())
+      })
   }
 
   /**
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index af51102..9afa67f 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -18,6 +18,7 @@
 package whisk.core.loadBalancer.test
 
 import akka.actor.ActorSystem
+import akka.actor.Cancellable
 import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
 import common.StreamLogging
 import org.scalatest.{FlatSpec, Matchers}
@@ -30,10 +31,16 @@ import whisk.core.entity.InstanceId
 import scala.concurrent.duration._
 
 class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
+  final val emptyCancellable: Cancellable = new Cancellable {
+    def isCancelled = false
+    def cancel() = true
+  }
 
   val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
-  val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise)
-  val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
+  val firstEntry: ActivationEntry =
+    ActivationEntry(ActivationId(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise)
+  val secondEntry: ActivationEntry =
+    ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
 
   val port = 2552
   val host = "127.0.0.1"
@@ -149,7 +156,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
   it should "respond with different values accordingly" in {
 
-    val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
+    val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
     val entrySameInvokerAndNamespace = entry.copy(id = ActivationId())
     val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID())
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].