You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/08 18:15:31 UTC

[GitHub] rabbah closed pull request #2991: controller creates health and cacheInvalidation topics

rabbah closed pull request #2991: controller creates health and cacheInvalidation topics
URL: https://github.com/apache/incubator-openwhisk/pull/2991
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index c3b3c12160..0056e96105 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -24,7 +24,7 @@ invoker_arguments: "{{ controller_arguments }}"
 
 invoker_allow_multiple_instances: true
 
-# Set kafka topic retention
+# Set kafka configuration
 kafka_heap: '512m'
 kafka_topics_completed_retentionBytes: 104857600
 kafka_topics_completed_retentionMS: 300000
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 7467aee7f2..ad77a883ba 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -135,23 +135,6 @@ kafka:
     port: 8093
   heap: "{{ kafka_heap | default('1g') }}"
   replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}"
-  topics:
-    completed:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}"
-    health:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
-    invoker:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}"
-    cacheInvalidation:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}"
 
 kafka_connect_string: "{% set ret = [] %}\
                        {% for host in groups['kafkas'] %}\
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 6caa545536..82db5d3e3f 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -52,10 +52,16 @@
       "WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
 
       "KAFKA_HOSTS": "{{ kafka_connect_string }}"
-      "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}"
-      "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}"
-      "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}"
-      "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
+      "CONFIG_whisk_kafka_replicationFactor": "{{ kafka.replicationFactor | default() }}"
+      "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionBytes": "{{ kafka_topics_cacheInvalidation_retentionBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionMs": "{{ kafka_topics_cacheInvalidation_retentionMS | default() }}"
+      "CONFIG_whisk_kafka_topics_cacheInvalidation_segmentBytes": "{{ kafka_topics_cacheInvalidation_segmentBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_completed_retentionBytes": "{{ kafka_topics_completed_retentionBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_completed_retentionMs": "{{ kafka_topics_completed_retentionMS | default() }}"
+      "CONFIG_whisk_kafka_topics_completed_segmentBytes": "{{ kafka_topics_completed_segmentBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ kafka_topics_health_retentionBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_health_retentionMs": "{{ kafka_topics_health_retentionMS | default() }}"
+      "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}"
 
       "DB_PROTOCOL": "{{ db_protocol }}"
       "DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 112b37b80f..91d381fb89 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -118,10 +118,10 @@
         -e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}'
         -e PORT='8080'
         -e KAFKA_HOSTS='{{ kafka_connect_string }}'
-        -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}'
-        -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}'
-        -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}'
-        -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+        -e CONFIG_whisk_kafka_replicationFactor='{{ kafka.replicationFactor | default() }}'
+        -e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ kafka_topics_invoker_retentionBytes | default() }}'
+        -e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ kafka_topics_invoker_retentionMS | default() }}'
+        -e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ kafka_topics_invoker_segmentBytes | default() }}'
         -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index 3d5845b164..7c6fedae95 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -32,15 +32,3 @@
   until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout)
   retries: 10
   delay: 5
-
-- name: create the health and the cacheInvalidation topic
-  shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
-  register: command_result
-  failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
-  changed_when: "'Created topic' in command_result.stdout"
-  with_items:
-  - name: health
-    settings: "{{ kafka.topics.health }}"
-  - name: cacheInvalidation
-    settings: "{{ kafka.topics.cacheInvalidation }}"
-  when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 41b41cee6d..dc2d4b1708 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -47,4 +47,33 @@ kamon {
     modules {
         kamon-statsd.auto-start = yes
     }
-}
\ No newline at end of file
+}
+
+whisk {
+    # kafka related configuration
+    kafka {
+        replication-factor = 1
+        topics {
+            cache-invalidation {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 300000
+            }
+            completed {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
+            health {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
+            invoker {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 172800000
+            }
+        }
+    }
+}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 599b5bfe72..1cc1ea28dd 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -35,6 +35,19 @@ import whisk.core.connector.MessageConsumer
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
 
+import pureconfig._
+
+case class KafkaConfig(replicationFactor: Short)
+
+case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: Long) {
+  def toMap: Map[String, String] = {
+    Map(
+      "retention.bytes" -> retentionBytes.toString,
+      "retention.ms" -> retentionMs.toString,
+      "segment.bytes" -> segmentBytes.toString)
+  }
+}
+
 /**
  * A Kafka based implementation of MessagingProvider
  */
@@ -46,15 +59,14 @@ object KafkaMessagingProvider extends MessagingProvider {
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
     new KafkaProducerConnector(config.kafkaHosts, ec)
 
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
-    implicit logging: Logging): Boolean = {
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
+    val kc = loadConfigOrThrow[KafkaConfig]("whisk.kafka")
+    val tc = loadConfigOrThrow[TopicConfig](s"whisk.kafka.topics.$topicConfig")
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
     val client = AdminClient.create(props)
-    val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
-    val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort
-    val nt = new NewTopic(topic, numPartitions, replicationFactor)
-      .configs((topicConfig - ("numPartitions", "replicationFactor")).asJava)
+    val numPartitions = 1
+    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.toMap.asJava)
     val results = client.createTopics(List(nt).asJava)
     try {
       results.values().get(topic).get()
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 36c17afe0f..96cf1bc5ab 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -96,14 +96,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val dbActivations = this(WhiskConfig.dbActivations)
   val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
 
-  val kafkaTopicsInvokerRetentionBytes = this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
-  val kafkaTopicsInvokerRetentionMS = this(WhiskConfig.kafkaTopicsInvokerRetentionMS)
-  val kafkaTopicsInvokerSegmentBytes = this(WhiskConfig.kafkaTopicsInvokerSegmentBytes)
-  val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
-  val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
-  val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
-  val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
-
   val runtimesManifest = this(WhiskConfig.runtimesManifest)
   val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
   val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit)
@@ -242,14 +234,6 @@ object WhiskConfig {
 
   val runtimesManifest = "runtimes.manifest"
 
-  val kafkaTopicsInvokerRetentionBytes = "kafka.topics.invoker.retentionBytes"
-  val kafkaTopicsInvokerRetentionMS = "kafka.topics.invoker.retentionMS"
-  val kafkaTopicsInvokerSegmentBytes = "kafka.topics.invoker.segmentBytes"
-  val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes"
-  val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
-  val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
-  val kafkaReplicationFactor = "kafka.replicationFactor"
-
   val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
   val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
   val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 7772723317..1737624611 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -34,6 +34,5 @@ trait MessagingProvider extends Spi {
                   maxPeek: Int = Int.MaxValue,
                   maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
-    implicit logging: Logging): Boolean
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 80a6a90fba..08b8f1d544 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -212,17 +212,15 @@ object Controller {
     }
 
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(
-          config,
-          "completed" + instance,
-          Map(
-            "numPartitions" -> "1",
-            "replicationFactor" -> config.kafkaReplicationFactor,
-            "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
-            "retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
-            "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
+    if (!msgProvider.ensureTopic(config, topic = "completed" + instance, topicConfig = "completed")) {
       abort(s"failure during msgProvider.ensureTopic for topic completed$instance")
     }
+    if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = "health")) {
+      abort(s"failure during msgProvider.ensureTopic for topic health")
+    }
+    if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", topicConfig = "cache-invalidation")) {
+      abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation")
+    }
 
     ExecManifest.initialize(config) match {
       case Success(_) =>
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 5be950df3d..e3776aea53 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -45,7 +45,13 @@ import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
-import whisk.core.entity.size._
+import whisk.core.entity.{ActivationId, WhiskActivation}
+import whisk.core.entity.EntityName
+import whisk.core.entity.ExecutableWhiskActionMetaData
+import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
+import whisk.core.entity.UUID
+import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
 import whisk.spi.SpiLoader
 import pureconfig._
@@ -345,11 +351,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
 object LoadBalancerService {
   def requiredProperties =
     kafkaHosts ++
-      Map(
-        kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
-        kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
-        kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
-        kafkaReplicationFactor -> "1") ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
 
   /** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1a71e29379..77e3da82ae 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -43,7 +43,6 @@ import whisk.core.entity.ExecManifest
 import whisk.core.entity.InstanceId
 import whisk.core.entity.WhiskActivationStore
 import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.size._
 import whisk.http.BasicHttpService
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
@@ -62,11 +61,6 @@ object Invoker {
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
       kafkaHosts ++
-      Map(
-        kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
-        kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
-        kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
-        kafkaReplicationFactor -> "1") ++
       zookeeperHosts ++
       wskApiHost ++ Map(
       dockerImageTag -> "latest",
@@ -188,15 +182,7 @@ object Invoker {
 
     val invokerInstance = InstanceId(assignedInvokerId)
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(
-          config,
-          "invoker" + assignedInvokerId,
-          Map(
-            "numPartitions" -> "1",
-            "replicationFactor" -> config.kafkaReplicationFactor,
-            "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
-            "retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
-            "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
+    if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) {
       abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
     }
     val producer = msgProvider.getProducer(config, ec)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services