You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/08 18:15:31 UTC
[GitHub] rabbah closed pull request #2991: controller creates health and cacheInvalidation topics
rabbah closed pull request #2991: controller creates health and cacheInvalidation topics
URL: https://github.com/apache/incubator-openwhisk/pull/2991
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index c3b3c12160..0056e96105 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -24,7 +24,7 @@ invoker_arguments: "{{ controller_arguments }}"
invoker_allow_multiple_instances: true
-# Set kafka topic retention
+# Set kafka configuration
kafka_heap: '512m'
kafka_topics_completed_retentionBytes: 104857600
kafka_topics_completed_retentionMS: 300000
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 7467aee7f2..ad77a883ba 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -135,23 +135,6 @@ kafka:
port: 8093
heap: "{{ kafka_heap | default('1g') }}"
replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}"
- topics:
- completed:
- segmentBytes: 536870912
- retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}"
- retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}"
- health:
- segmentBytes: 536870912
- retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}"
- retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
- invoker:
- segmentBytes: 536870912
- retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
- retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}"
- cacheInvalidation:
- segmentBytes: 536870912
- retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}"
- retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}"
kafka_connect_string: "{% set ret = [] %}\
{% for host in groups['kafkas'] %}\
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 6caa545536..82db5d3e3f 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -52,10 +52,16 @@
"WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
"KAFKA_HOSTS": "{{ kafka_connect_string }}"
- "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}"
- "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}"
- "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}"
- "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
+ "CONFIG_whisk_kafka_replicationFactor": "{{ kafka.replicationFactor | default() }}"
+ "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionBytes": "{{ kafka_topics_cacheInvalidation_retentionBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionMs": "{{ kafka_topics_cacheInvalidation_retentionMS | default() }}"
+ "CONFIG_whisk_kafka_topics_cacheInvalidation_segmentBytes": "{{ kafka_topics_cacheInvalidation_segmentBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_completed_retentionBytes": "{{ kafka_topics_completed_retentionBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_completed_retentionMs": "{{ kafka_topics_completed_retentionMS | default() }}"
+ "CONFIG_whisk_kafka_topics_completed_segmentBytes": "{{ kafka_topics_completed_segmentBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ kafka_topics_health_retentionBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_health_retentionMs": "{{ kafka_topics_health_retentionMS | default() }}"
+ "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}"
"DB_PROTOCOL": "{{ db_protocol }}"
"DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 112b37b80f..91d381fb89 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -118,10 +118,10 @@
-e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}'
-e PORT='8080'
-e KAFKA_HOSTS='{{ kafka_connect_string }}'
- -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}'
- -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}'
- -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}'
- -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+ -e CONFIG_whisk_kafka_replicationFactor='{{ kafka.replicationFactor | default() }}'
+ -e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ kafka_topics_invoker_retentionBytes | default() }}'
+ -e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ kafka_topics_invoker_retentionMS | default() }}'
+ -e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ kafka_topics_invoker_segmentBytes | default() }}'
-e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
-e DB_PROTOCOL='{{ db_protocol }}'
-e DB_PROVIDER='{{ db_provider }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index 3d5845b164..7c6fedae95 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -32,15 +32,3 @@
until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout)
retries: 10
delay: 5
-
-- name: create the health and the cacheInvalidation topic
- shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
- register: command_result
- failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
- changed_when: "'Created topic' in command_result.stdout"
- with_items:
- - name: health
- settings: "{{ kafka.topics.health }}"
- - name: cacheInvalidation
- settings: "{{ kafka.topics.cacheInvalidation }}"
- when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 41b41cee6d..dc2d4b1708 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -47,4 +47,33 @@ kamon {
modules {
kamon-statsd.auto-start = yes
}
-}
\ No newline at end of file
+}
+
+whisk {
+ # kafka related configuration
+ kafka {
+ replication-factor = 1
+ topics {
+ cache-invalidation {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 300000
+ }
+ completed {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 3600000
+ }
+ health {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 3600000
+ }
+ invoker {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 172800000
+ }
+ }
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 599b5bfe72..1cc1ea28dd 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -35,6 +35,19 @@ import whisk.core.connector.MessageConsumer
import whisk.core.connector.MessageProducer
import whisk.core.connector.MessagingProvider
+import pureconfig._
+
+case class KafkaConfig(replicationFactor: Short)
+
+case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: Long) {
+ def toMap: Map[String, String] = {
+ Map(
+ "retention.bytes" -> retentionBytes.toString,
+ "retention.ms" -> retentionMs.toString,
+ "segment.bytes" -> segmentBytes.toString)
+ }
+}
+
/**
* A Kafka based implementation of MessagingProvider
*/
@@ -46,15 +59,14 @@ object KafkaMessagingProvider extends MessagingProvider {
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
new KafkaProducerConnector(config.kafkaHosts, ec)
- def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
- implicit logging: Logging): Boolean = {
+ def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
+ val kc = loadConfigOrThrow[KafkaConfig]("whisk.kafka")
+ val tc = loadConfigOrThrow[TopicConfig](s"whisk.kafka.topics.$topicConfig")
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
val client = AdminClient.create(props)
- val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
- val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort
- val nt = new NewTopic(topic, numPartitions, replicationFactor)
- .configs((topicConfig - ("numPartitions", "replicationFactor")).asJava)
+ val numPartitions = 1
+ val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.toMap.asJava)
val results = client.createTopics(List(nt).asJava)
try {
results.values().get(topic).get()
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 36c17afe0f..96cf1bc5ab 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -96,14 +96,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
val dbActivations = this(WhiskConfig.dbActivations)
val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
- val kafkaTopicsInvokerRetentionBytes = this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
- val kafkaTopicsInvokerRetentionMS = this(WhiskConfig.kafkaTopicsInvokerRetentionMS)
- val kafkaTopicsInvokerSegmentBytes = this(WhiskConfig.kafkaTopicsInvokerSegmentBytes)
- val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
- val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
- val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
- val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
-
val runtimesManifest = this(WhiskConfig.runtimesManifest)
val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit)
@@ -242,14 +234,6 @@ object WhiskConfig {
val runtimesManifest = "runtimes.manifest"
- val kafkaTopicsInvokerRetentionBytes = "kafka.topics.invoker.retentionBytes"
- val kafkaTopicsInvokerRetentionMS = "kafka.topics.invoker.retentionMS"
- val kafkaTopicsInvokerSegmentBytes = "kafka.topics.invoker.segmentBytes"
- val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes"
- val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
- val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
- val kafkaReplicationFactor = "kafka.replicationFactor"
-
val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 7772723317..1737624611 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -34,6 +34,5 @@ trait MessagingProvider extends Spi {
maxPeek: Int = Int.MaxValue,
maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer
- def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
- implicit logging: Logging): Boolean
+ def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 80a6a90fba..08b8f1d544 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -212,17 +212,15 @@ object Controller {
}
val msgProvider = SpiLoader.get[MessagingProvider]
- if (!msgProvider.ensureTopic(
- config,
- "completed" + instance,
- Map(
- "numPartitions" -> "1",
- "replicationFactor" -> config.kafkaReplicationFactor,
- "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
- "retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
- "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
+ if (!msgProvider.ensureTopic(config, topic = "completed" + instance, topicConfig = "completed")) {
abort(s"failure during msgProvider.ensureTopic for topic completed$instance")
}
+ if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = "health")) {
+ abort(s"failure during msgProvider.ensureTopic for topic health")
+ }
+ if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", topicConfig = "cache-invalidation")) {
+ abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation")
+ }
ExecManifest.initialize(config) match {
case Success(_) =>
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 5be950df3d..e3776aea53 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -45,7 +45,13 @@ import whisk.core.connector.MessageProducer
import whisk.core.connector.MessagingProvider
import whisk.core.database.NoDocumentException
import whisk.core.entity._
-import whisk.core.entity.size._
+import whisk.core.entity.{ActivationId, WhiskActivation}
+import whisk.core.entity.EntityName
+import whisk.core.entity.ExecutableWhiskActionMetaData
+import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
+import whisk.core.entity.UUID
+import whisk.core.entity.WhiskAction
import whisk.core.entity.types.EntityStore
import whisk.spi.SpiLoader
import pureconfig._
@@ -345,11 +351,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
object LoadBalancerService {
def requiredProperties =
kafkaHosts ++
- Map(
- kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
- kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
- kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
- kafkaReplicationFactor -> "1") ++
Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
/** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1a71e29379..77e3da82ae 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -43,7 +43,6 @@ import whisk.core.entity.ExecManifest
import whisk.core.entity.InstanceId
import whisk.core.entity.WhiskActivationStore
import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.size._
import whisk.http.BasicHttpService
import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
@@ -62,11 +61,6 @@ object Invoker {
WhiskEntityStore.requiredProperties ++
WhiskActivationStore.requiredProperties ++
kafkaHosts ++
- Map(
- kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
- kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
- kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
- kafkaReplicationFactor -> "1") ++
zookeeperHosts ++
wskApiHost ++ Map(
dockerImageTag -> "latest",
@@ -188,15 +182,7 @@ object Invoker {
val invokerInstance = InstanceId(assignedInvokerId)
val msgProvider = SpiLoader.get[MessagingProvider]
- if (!msgProvider.ensureTopic(
- config,
- "invoker" + assignedInvokerId,
- Map(
- "numPartitions" -> "1",
- "replicationFactor" -> config.kafkaReplicationFactor,
- "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
- "retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
- "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
+ if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) {
abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
}
val producer = msgProvider.getProducer(config, ec)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services