You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/02/26 16:26:15 UTC
[incubator-openwhisk] branch master updated: Put active-ack
consumers in their own consumer-groups. (#3337)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new c184cc6 Put active-ack consumers in their own consumer-groups. (#3337)
c184cc6 is described below
commit c184cc6fdabbe857bfc1bd33aa7acda07179d1e5
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Feb 26 17:26:12 2018 +0100
Put active-ack consumers in their own consumer-groups. (#3337)
Just like with the invoker consumers, it doesn't make sense to have those in one group as crash of one will cause a rebalancing pause for the other.
---
.../scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala | 8 +++-----
.../whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala | 7 ++-----
2 files changed, 5 insertions(+), 10 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index dfa57bb..de2e56e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -205,14 +205,12 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
* Subscribes to active acks (completion messages from the invokers), and
* registers a handler for received active acks from invokers.
*/
+ val activeAckTopic = s"completed${controllerInstance.toInt}"
val maxActiveAcksPerPoll = 128
val activeAckPollDuration = 1.second
private val activeAckConsumer =
- messagingProvider.getConsumer(
- config,
- "completions",
- s"completed${controllerInstance.toInt}",
- maxPeek = maxActiveAcksPerPoll)
+ messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
+
val activationFeed = actorSystem.actorOf(Props {
new MessageFeed(
"activeack",
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 607670d..9b7aaec 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -205,14 +205,11 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
* Subscribes to active acks (completion messages from the invokers), and
* registers a handler for received active acks from invokers.
*/
+ private val activeAckTopic = s"completed${controllerInstance.toInt}"
private val maxActiveAcksPerPoll = 128
private val activeAckPollDuration = 1.second
private val activeAckConsumer =
- messagingProvider.getConsumer(
- config,
- "completions",
- s"completed${controllerInstance.toInt}",
- maxPeek = maxActiveAcksPerPoll)
+ messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
private val activationFeed = actorSystem.actorOf(Props {
new MessageFeed(
--
To stop receiving notification emails like this one, please contact
cbickel@apache.org.