You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2021/05/06 06:42:33 UTC
[openwhisk] branch master updated: Add prefix for topics (#5062)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new aa7e6e2 Add prefix for topics (#5062)
aa7e6e2 is described below
commit aa7e6e2af196ac017ae4b9ea36656bec868a9931
Author: ningyougang <41...@qq.com>
AuthorDate: Thu May 6 14:42:16 2021 +0800
Add prefix for topics (#5062)
- Add prefix for topics
- Add extra prefix for userEvent topic only
---
ansible/group_vars/all | 2 ++
ansible/roles/controller/tasks/deploy.yml | 4 ++++
ansible/roles/invoker/tasks/deploy.yml | 2 ++
common/scala/src/main/resources/application.conf | 4 ++++
.../scala/org/apache/openwhisk/common/UserEvents.scala | 4 +++-
.../scala/org/apache/openwhisk/core/WhiskConfig.scala | 2 ++
.../apache/openwhisk/core/ack/MessagingActiveAck.scala | 8 ++++++--
.../core/database/RemoteCacheInvalidation.scala | 7 ++++---
.../apache/openwhisk/core/controller/Controller.scala | 13 ++++++++-----
.../core/loadBalancer/CommonLoadBalancer.scala | 3 ++-
.../openwhisk/core/loadBalancer/LoadBalancer.scala | 4 +++-
.../loadBalancer/ShardingContainerPoolBalancer.scala | 7 ++++++-
.../org/apache/openwhisk/core/invoker/Invoker.scala | 4 +++-
.../openwhisk/core/invoker/InvokerReactive.scala | 4 ++--
.../apache/openwhisk/core/scheduler/Scheduler.scala | 18 ++++++++++--------
tests/src/test/resources/application.conf.j2 | 4 ++++
16 files changed, 65 insertions(+), 25 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 6e8a861..6851c70 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -139,6 +139,8 @@ registry:
confdir: "{{ config_root_dir }}/registry"
kafka:
+ topicsPrefix: "{{ kafka_topics_prefix | default('') }}"
+ topicsUserEventPrefix: "{{ kafka_topics_userEvent_prefix | default(kafka_topics_prefix) | default('') }}"
ssl:
client_authentication: required
keystore:
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 64724c4..575aaf4 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -176,6 +176,10 @@
"{{ kafka_topics_health_retentionMS | default() }}"
"CONFIG_whisk_kafka_topics_health_segmentBytes":
"{{ kafka_topics_health_segmentBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_prefix":
+ "{{ kafka.topicsPrefix }}"
+ "CONFIG_whisk_kafka_topics_userEvent_prefix":
+ "{{ kafka.topicsUserEventPrefix }}"
"CONFIG_whisk_kafka_common_securityProtocol":
"{{ kafka.protocol }}"
"CONFIG_whisk_kafka_common_sslTruststoreLocation":
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 8c3c027..fe79439 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -216,6 +216,8 @@
"CONFIG_whisk_kafka_topics_invoker_retentionBytes": "{{ kafka_topics_invoker_retentionBytes | default() }}"
"CONFIG_whisk_kafka_topics_invoker_retentionMs": "{{ kafka_topics_invoker_retentionMS | default() }}"
"CONFIG_whisk_kakfa_topics_invoker_segmentBytes": "{{ kafka_topics_invoker_segmentBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_prefix": "{{ kafka.topicsPrefix }}"
+ "CONFIG_whisk_kafka_topics_userEvent_prefix": "{{ kafka.topicsUserEventPrefix }}"
"CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
"CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
"CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index a894360..230e16d 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -193,6 +193,10 @@ whisk {
retention-bytes = 1073741824
retention-ms = 3600000
}
+ prefix = ""
+ user-event {
+ prefix = ""
+ }
}
metrics {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala
index 5417cfd..f3cdf00 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala
@@ -28,9 +28,11 @@ object UserEvents {
val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled
+ val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)
+
def send(producer: MessageProducer, em: => EventMessage) = {
if (enabled) {
- producer.send("events", em)
+ producer.send(userEventTopicPrefix + "events", em)
}
}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 19ad39d..1058e53 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -212,6 +212,8 @@ object ConfigKeys {
val kafkaProducer = s"$kafka.producer"
val kafkaConsumer = s"$kafka.consumer"
val kafkaTopics = s"$kafka.topics"
+ val kafkaTopicsPrefix = s"$kafkaTopics.prefix"
+ val kafkaTopicsUserEventPrefix = s"$kafkaTopics.user-event.prefix"
val memory = "whisk.memory"
val timeLimit = "whisk.time-limit"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
index eb9cce9..b798d88 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
@@ -19,9 +19,10 @@ package org.apache.openwhisk.core.ack
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
import org.apache.openwhisk.core.entity._
-
+import pureconfig._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
@@ -29,6 +30,9 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS
implicit logging: Logging,
ec: ExecutionContext)
extends ActiveAck {
+
+ private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+
override def apply(tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
@@ -38,7 +42,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS
implicit val transid: TransactionId = tid
def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
- producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
+ producer.send(topic = topicPrefix + "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
logging.info(this, s"posted $info of activation ${acknowledegment.activationId}")
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
index c8c432b..c5b5e01 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
@@ -24,12 +24,11 @@ import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import akka.actor.ActorSystem
import akka.actor.Props
import spray.json._
import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.Message
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.MessagingProvider
@@ -41,6 +40,7 @@ import org.apache.openwhisk.core.entity.WhiskPackage
import org.apache.openwhisk.core.entity.WhiskRule
import org.apache.openwhisk.core.entity.WhiskTrigger
import org.apache.openwhisk.spi.SpiLoader
+import pureconfig._
case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
@@ -101,5 +101,6 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance:
}
object RemoteCacheInvalidation {
- val cacheInvalidationTopic = "cacheInvalidation"
+ val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+ val cacheInvalidationTopic = topicPrefix + "cacheInvalidation"
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 9352196..a8ead9d 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -32,7 +32,7 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
-import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -187,6 +187,9 @@ object Controller {
protected val interface = loadConfigOrThrow[String]("whisk.controller.interface")
protected val readinessThreshold = loadConfig[Double]("whisk.controller.readiness-fraction")
+ val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+ val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)
+
// requiredProperties is a Map whose keys define properties that must be bound to
// a value, and whose values are default values. A null value in the Map means there is
// no default value specified, so it must appear in the properties file
@@ -263,10 +266,10 @@ object Controller {
val msgProvider = SpiLoader.get[MessagingProvider]
Seq(
- ("completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
- ("health", "health", None),
- ("cacheInvalidation", "cache-invalidation", None),
- ("events", "events", None)).foreach {
+ (topicPrefix + "completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+ (topicPrefix + "health", "health", None),
+ (topicPrefix + "cacheInvalidation", "cache-invalidation", None),
+ (userEventTopicPrefix + "events", "events", None)).foreach {
case (topic, topicConfigurationKey, maxMessageBytes) =>
if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topic")
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 3d0cc2f..7820207 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -30,6 +30,7 @@ import pureconfig.generic.auto._
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
@@ -177,7 +178,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
invoker: InvokerInstanceId): Future[RecordMetadata] = {
implicit val transid: TransactionId = msg.transid
- val topic = s"invoker${invoker.toInt}"
+ val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
val start = transid.started(
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index 2022512..ee839b3 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -23,8 +23,10 @@ import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi
+
import scala.concurrent.duration._
/**
@@ -94,7 +96,7 @@ trait LoadBalancerProvider extends Spi {
def createFeedFactory(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
logging: Logging): FeedFactory = {
- val activeAckTopic = s"completed${instance.asString}"
+ val activeAckTopic = s"${Controller.topicPrefix}completed${instance.asString}"
val maxActiveAcksPerPoll = 128
val activeAckPollDuration = 1.second
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 28c1a83..14d3ff4 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -36,6 +36,7 @@ import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.common.LoggingMarkers._
+import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader
@@ -352,7 +353,11 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
InvokerPool.props(
(f, i) => f.actorOf(InvokerActor.props(i, instance)),
(m, i) => sendActivationToInvoker(messagingProducer, m, i),
- messagingProvider.getConsumer(whiskConfig, s"health${instance.asString}", "health", maxPeek = 128),
+ messagingProvider.getConsumer(
+ whiskConfig,
+ s"${Controller.topicPrefix}health${instance.asString}",
+ s"${Controller.topicPrefix}health",
+ maxPeek = 128),
monitor))
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 3d9cc46..65083b1 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -73,6 +73,8 @@ object Invoker {
protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")
+ val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+
/**
* An object which records the environment variables required for this component to run.
*/
@@ -175,7 +177,7 @@ object Invoker {
initKamon(assignedInvokerId)
val topicBaseName = "invoker"
- val topicName = topicBaseName + assignedInvokerId
+ val topicName = topicPrefix + topicBaseName + assignedInvokerId
val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
val invokerInstance =
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index db0dfb4..7ddff8d 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -120,7 +120,7 @@ class InvokerReactive(
}
/** Initialize message consumers */
- private val topic = s"invoker${instance.toInt}"
+ private val topic = s"${Invoker.topicPrefix}invoker${instance.toInt}"
private val maximumContainers = (poolConfig.userMemory / MemoryLimit.MIN_MEMORY).toInt
private val msgProvider = SpiLoader.get[MessagingProvider]
@@ -296,7 +296,7 @@ class InvokerReactive(
private val healthProducer = msgProvider.getProducer(config)
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
- healthProducer.send("health", PingMessage(instance)).andThen {
+ healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index d9bb08d..61f9927 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -138,12 +138,9 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
*/
val memoryQueueFactory = "" // TODO: TBD
- val schedulerConsumer = msgProvider.getConsumer(
- config,
- s"scheduler${schedulerId.asString}",
- s"scheduler${schedulerId.asString}",
- maxPeek,
- maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+ val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}"
+ val schedulerConsumer =
+ msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
implicit val trasnid = TransactionId.containerCreation
@@ -171,6 +168,8 @@ object Scheduler {
protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+ val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+
/**
* The scheduler has two ports, one for akka-remote and the other for akka-grpc.
*/
@@ -236,8 +235,11 @@ object Scheduler {
val msgProvider = SpiLoader.get[MessagingProvider]
Seq(
- ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
- ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
+ (topicPrefix + "scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+ (
+ topicPrefix + "creationAck" + instanceId.asString,
+ "creationAck",
+ Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
.foreach {
case (topic, topicConfigurationKey, maxMessageBytes) =>
if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index cdae2bd..5ce7f31 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -29,6 +29,10 @@ whisk {
retention-bytes = 1073741824
retention-ms = 3600000
}
+ prefix = "{{ kafka.topicsPrefix }}"
+ user-event {
+ prefix = "{{ kafka.topicsUserEventPrefix }}"
+ }
}
common {
security-protocol: {{ kafka.protocol }}