You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/02/26 16:26:15 UTC

[incubator-openwhisk] branch master updated: Put active-ack consumers in their own consumer-groups. (#3337)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c184cc6  Put active-ack consumers in their own consumer-groups. (#3337)
c184cc6 is described below

commit c184cc6fdabbe857bfc1bd33aa7acda07179d1e5
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Feb 26 17:26:12 2018 +0100

    Put active-ack consumers in their own consumer-groups. (#3337)
    
    Just like with the invoker consumers, it doesn't make sense to have those in one group as crash of one will cause a rebalancing pause for the other.
---
 .../scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala     | 8 +++-----
 .../whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala   | 7 ++-----
 2 files changed, 5 insertions(+), 10 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index dfa57bb..de2e56e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -205,14 +205,12 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
    * Subscribes to active acks (completion messages from the invokers), and
    * registers a handler for received active acks from invokers.
    */
+  val activeAckTopic = s"completed${controllerInstance.toInt}"
   val maxActiveAcksPerPoll = 128
   val activeAckPollDuration = 1.second
   private val activeAckConsumer =
-    messagingProvider.getConsumer(
-      config,
-      "completions",
-      s"completed${controllerInstance.toInt}",
-      maxPeek = maxActiveAcksPerPoll)
+    messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
+
   val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed(
       "activeack",
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 607670d..9b7aaec 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -205,14 +205,11 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
    * Subscribes to active acks (completion messages from the invokers), and
    * registers a handler for received active acks from invokers.
    */
+  private val activeAckTopic = s"completed${controllerInstance.toInt}"
   private val maxActiveAcksPerPoll = 128
   private val activeAckPollDuration = 1.second
   private val activeAckConsumer =
-    messagingProvider.getConsumer(
-      config,
-      "completions",
-      s"completed${controllerInstance.toInt}",
-      maxPeek = maxActiveAcksPerPoll)
+    messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
 
   private val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed(

-- 
To stop receiving notification emails like this one, please contact
cbickel@apache.org.