You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/12/21 21:28:50 UTC
[incubator-openwhisk] branch master updated: make sure to cancel
the timeout handler, in LoadBalancerService (#3118)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 1e90267 make sure to cancel the timeout handler, in LoadBalancerService (#3118)
1e90267 is described below
commit 1e902675bcf6160af8d72d8251739477162f8d76
Author: Nick Mitchell <st...@users.noreply.github.com>
AuthorDate: Thu Dec 21 16:28:47 2017 -0500
make sure to cancel the timeout handler, in LoadBalancerService (#3118)
Make sure to cancel the timeout handler on the active ack when a response is received. This reduces memory drag.
---
.../whisk/core/loadBalancer/LoadBalancerData.scala | 3 +++
.../core/loadBalancer/LoadBalancerService.scala | 20 ++++++++++++++------
.../loadBalancer/test/LoadBalancerDataTests.scala | 13 ++++++++++---
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 1866d2d..0018cbb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -19,11 +19,14 @@ package whisk.core.loadBalancer
import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
+import akka.actor.Cancellable
import scala.concurrent.{Future, Promise}
+// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
case class ActivationEntry(id: ActivationId,
namespaceId: UUID,
invokerName: InstanceId,
+ timeoutHandler: Cancellable,
promise: Promise[Either[ActivationId, WhiskActivation]])
trait LoadBalancerData {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 17d8796..29a169f 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -154,6 +154,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
if (!forced) {
+ entry.timeoutHandler.cancel()
entry.promise.trySuccess(response)
} else {
entry.promise.tryFailure(new Throwable("no active ack received"))
@@ -187,13 +188,20 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
// in this case, if the activation handler is still registered, remove it and update the books.
// in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
// n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
- loadBalancerData.putActivation(activationId, {
- actorSystem.scheduler.scheduleOnce(timeout) {
- processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
- }
+ loadBalancerData.putActivation(
+ activationId, {
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
+ }
- ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
- })
+ // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
+ ActivationEntry(
+ activationId,
+ namespaceId,
+ invokerName,
+ timeoutHandler,
+ Promise[Either[ActivationId, WhiskActivation]]())
+ })
}
/**
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index af51102..9afa67f 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -18,6 +18,7 @@
package whisk.core.loadBalancer.test
import akka.actor.ActorSystem
+import akka.actor.Cancellable
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import common.StreamLogging
import org.scalatest.{FlatSpec, Matchers}
@@ -30,10 +31,16 @@ import whisk.core.entity.InstanceId
import scala.concurrent.duration._
class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
+ final val emptyCancellable: Cancellable = new Cancellable {
+ def isCancelled = false
+ def cancel() = true
+ }
val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
- val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise)
- val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
+ val firstEntry: ActivationEntry =
+ ActivationEntry(ActivationId(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise)
+ val secondEntry: ActivationEntry =
+ ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
val port = 2552
val host = "127.0.0.1"
@@ -149,7 +156,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
it should "respond with different values accordingly" in {
- val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
+ val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
val entrySameInvokerAndNamespace = entry.copy(id = ActivationId())
val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID())
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].