You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/28 10:45:25 UTC

[GitHub] jeremiaswerner closed pull request #2744: Deploy kafka & zookeeper cluster with ansible

jeremiaswerner closed pull request #2744: Deploy kafka & zookeeper cluster with ansible
URL: https://github.com/apache/incubator-openwhisk/pull/2744
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/environments/distributed/hosts b/ansible/environments/distributed/hosts
index 0880e6d77c..9c9045b01e 100755
--- a/ansible/environments/distributed/hosts
+++ b/ansible/environments/distributed/hosts
@@ -18,9 +18,12 @@ edge
 [controllers]
 10.3.2.155              ansible_host=10.3.2.155
 
-[kafka]
+[kafkas]
 10.3.2.156              ansible_host=10.3.2.156
 
+[zookeepers:children]
+kafkas
+
 [invokers]
 10.3.2.158              ansible_host=10.3.2.158
 10.3.2.159              ansible_host=10.3.2.159
diff --git a/ansible/environments/docker-machine/hosts.j2.ini b/ansible/environments/docker-machine/hosts.j2.ini
index 4301576768..a4990ae359 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -10,9 +10,12 @@ ansible ansible_connection=local
 controller0                 ansible_host={{ docker_machine_ip }}
 controller1                 ansible_host={{ docker_machine_ip }}
 
-[kafka]
+[kafkas]
 {{ docker_machine_ip }}     ansible_host={{ docker_machine_ip }}
 
+[zookeepers:children]
+kafkas
+
 [invokers]
 invoker0                    ansible_host={{ docker_machine_ip }}
 invoker1                    ansible_host={{ docker_machine_ip }}
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index 3860f33a2a..c3b3c12160 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -23,3 +23,12 @@ controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxre
 invoker_arguments: "{{ controller_arguments }}"
 
 invoker_allow_multiple_instances: true
+
+# Set kafka topic retention
+kafka_heap: '512m'
+kafka_topics_completed_retentionBytes: 104857600
+kafka_topics_completed_retentionMS: 300000
+kafka_topics_health_retentionBytes: 104857600
+kafka_topics_health_retentionMS: 300000
+kafka_topics_invoker_retentionBytes: 104857600
+kafka_topics_invoker_retentionMS: 300000
diff --git a/ansible/environments/local/hosts b/ansible/environments/local/hosts
index 00702f397d..516340861a 100644
--- a/ansible/environments/local/hosts
+++ b/ansible/environments/local/hosts
@@ -10,8 +10,12 @@ ansible ansible_connection=local
 controller0         ansible_host=172.17.0.1 ansible_connection=local
 controller1         ansible_host=172.17.0.1 ansible_connection=local
 
-[kafka]
-172.17.0.1          ansible_host=172.17.0.1 ansible_connection=local
+[kafkas]
+kafka0              ansible_host=172.17.0.1 ansible_connection=local
+kafka1              ansible_host=172.17.0.1 ansible_connection=local
+
+[zookeepers:children]
+kafkas
 
 [invokers]
 invoker0            ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 02c2b08ad1..f1a746d5b1 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -129,33 +129,46 @@ registry:
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
-  version: 0.10.2.1
+  version: 0.11.0.1
   port: 9092
   ras:
-    port: 9093
+    port: 8093
   heap: "{{ kafka_heap | default('1g') }}"
+  replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}"
   topics:
     completed:
       segmentBytes: 536870912
       retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}"
-      retentionMS: 3600000
+      retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}"
     health:
       segmentBytes: 536870912
       retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}"
-      retentionMS: 3600000
+      retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
     invoker:
       segmentBytes: 536870912
       retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
-      retentionMS: 172800000
+      retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}"
     cacheInvalidation:
       segmentBytes: 536870912
-      retentionBytes: 1073741824
-      retentionMS: 300000
+      retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}"
+      retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}" 
+
+kafka_connect_string: "{% set ret = [] %}\
+                       {% for host in groups['kafkas'] %}\
+                         {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+                       {% endfor %}\
+                       {{ ret | join(',') }}"
 
 zookeeper:
   version: 3.4
   port: 2181
 
+zookeeper_connect_string: "{% set ret = [] %}\
+                           {% for host in groups['zookeepers'] %}\
+                             {{ ret.append( hostvars[host].ansible_host + ':' + ((zookeeper.port+loop.index-1)|string) ) }}\
+                           {% endfor %}\
+                           {{ ret | join(',') }}"
+
 invoker:
   port: 12001
   heap: "{{ invoker_heap | default('2g') }}"
diff --git a/ansible/kafka.yml b/ansible/kafka.yml
index 1b011b5ae3..fba52f2627 100644
--- a/ansible/kafka.yml
+++ b/ansible/kafka.yml
@@ -1,6 +1,10 @@
 ---
 # This playbook deploys an Openwhisk Kafka bus.  
 
-- hosts: kafka
+- hosts: zookeepers
+  roles:
+  - zookeeper
+
+- hosts: kafkas
   roles:
   - kafka
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3978316a07..2aa0b56b73 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -51,11 +51,11 @@
       "WHISK_VERSION_DATE": "{{ whisk.version.date }}"
       "WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
 
-      "KAFKA_HOST": "{{ groups['kafka']|first }}"
-      "KAFKA_HOST_PORT": "{{ kafka.port }}"
+      "KAFKA_HOSTS": "{{ kafka_connect_string }}"
       "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}"
       "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}"
       "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}"
+      "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
 
       "DB_PROTOCOL": "{{ db_protocol }}"
       "DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 94c06a3d2d..56b810015f 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -117,13 +117,12 @@
         -e INVOKER_OPTS='{{ invoker.arguments }}'
         -e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}'
         -e PORT='8080'
-        -e KAFKA_HOST='{{ groups['kafka']|first }}'
-        -e KAFKA_HOST_PORT='{{ kafka.port }}'
+        -e KAFKA_HOSTS='{{ kafka_connect_string }}'
         -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}'
         -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}'
         -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}'
-        -e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
-        -e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
+        -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+        -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
         -e DB_HOST='{{ db_host }}'
diff --git a/ansible/roles/kafka/tasks/clean.yml b/ansible/roles/kafka/tasks/clean.yml
index fa1db75d0f..b9d593381c 100644
--- a/ansible/roles/kafka/tasks/clean.yml
+++ b/ansible/roles/kafka/tasks/clean.yml
@@ -1,7 +1,7 @@
 ---
 # Remove kafka and zookeeper containers.
 
-- name: remove kafka
+- name: remove old kafka
   docker_container:
     name: kafka
     image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
@@ -9,10 +9,10 @@
     state: absent
   ignore_errors: True
 
-- name: remove zookeeper
+- name: remove kafka
   docker_container:
-    name: zookeeper
-    image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+    name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
     keep_volumes: False
     state: absent
   ignore_errors: True
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index f410cc3502..3d5845b164 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -1,57 +1,40 @@
 ---
 # This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
 
-- name: "pull the zookeeper:{{ zookeeper.version }} image"
-  shell: "docker pull zookeeper:{{ zookeeper.version }}"
-  retries: "{{ docker.pull.retries }}"
-  delay: "{{ docker.pull.delay }}"
-
-- name: (re)start zookeeper
-  docker_container:
-    name: zookeeper
-    image: zookeeper:{{ zookeeper.version }}
-    state: started
-    recreate: true
-    restart_policy: "{{ docker.restart.policy }}"
-    ports:
-      - "{{ zookeeper.port }}:2181"
-
-- name: wait until the Zookeeper in this host is up and running
-  action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port }}
-  register: result
-  until: (result.rc == 0) and (result.stdout == 'imok')
-  retries: 36
-  delay: 5
-
-- name: "pull the ches/kafka:{{ kafka.version }} image"
-  shell: "docker pull ches/kafka:{{ kafka.version }}"
+- name: "pull the wurstmeister/kafka:{{ kafka.version }} image"
+  shell: "docker pull wurstmeister/kafka:{{ kafka.version }}"
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
 - name: (re)start kafka
+  vars:
+    zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
   docker_container:
-    name: kafka
-    image: ches/kafka:{{ kafka.version }}
+    name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+    image: wurstmeister/kafka:{{ kafka.version }}
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
-    links:
-      - "zookeeper:zookeeper"
     env:
+      "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+      "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+      "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+      "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
     ports:
-      - "{{ kafka.port }}:9092"
+      - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
 
 - name: wait until the kafka server started up
-  shell: docker logs kafka
+  shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
   register: result
-  until: ('[Kafka Server 0], started' in result.stdout)
+  until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout)
   retries: 10
   delay: 5
 
 - name: create the health and the cacheInvalidation topic
-  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
+  shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
   register: command_result
   failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
   changed_when: "'Created topic' in command_result.stdout"
@@ -60,3 +43,4 @@
     settings: "{{ kafka.topics.health }}"
   - name: cacheInvalidation
     settings: "{{ kafka.topics.cacheInvalidation }}"
+  when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/ansible/roles/zookeeper/tasks/clean.yml b/ansible/roles/zookeeper/tasks/clean.yml
new file mode 100644
index 0000000000..df82ab100c
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/clean.yml
@@ -0,0 +1,18 @@
+---
+# Remove kafka and zookeeper containers.
+
+- name: remove old zookeeper
+  docker_container:
+    name: zookeeper
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+    keep_volumes: False
+    state: absent
+  ignore_errors: True
+
+- name: remove zookeeper
+  docker_container:
+    name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+    keep_volumes: False
+    state: absent
+  ignore_errors: True
diff --git a/ansible/roles/zookeeper/tasks/deploy.yml b/ansible/roles/zookeeper/tasks/deploy.yml
new file mode 100644
index 0000000000..9d88911a36
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/deploy.yml
@@ -0,0 +1,37 @@
+---
+# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
+
+- name: "pull the zookeeper:{{ zookeeper.version }} image"
+  shell: "docker pull zookeeper:{{ zookeeper.version }}"
+  retries: "{{ docker.pull.retries }}"
+  delay: "{{ docker.pull.delay }}"
+
+- name: (re)start zookeeper
+  docker_container:
+    name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
+    image: zookeeper:{{ zookeeper.version }}
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    env:
+        ZOO_MY_ID: "{{ groups['zookeepers'].index(inventory_hostname) + 1 }}"
+        ZOO_SERVERS: "{% set zhosts = [] %}
+                      {% for host in groups['zookeepers'] %}
+                        {% if host == inventory_hostname %}
+                          {{ zhosts.append('server.' + (loop.index|string) + '=' + '0.0.0.0:2888:3888') }}
+                        {% else %}
+                          {{ zhosts.append('server.' + (loop.index|string) + '=' + hostvars[host].ansible_host + ':' + ((2888+loop.index-1)|string) + ':' + ((3888+loop.index-1)|string) ) }}
+                        {% endif %}
+                      {% endfor %}
+                      {{ zhosts | join(' ') }}"
+    ports:
+      - "{{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}:2181"
+      - "{{ 2888 + groups['zookeepers'].index(inventory_hostname) }}:2888"
+      - "{{ 3888 + groups['zookeepers'].index(inventory_hostname) }}:3888"
+
+- name: wait until the Zookeeper in this host is up and running
+  action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}
+  register: result
+  until: (result.rc == 0) and (result.stdout == 'imok')
+  retries: 36
+  delay: 5
diff --git a/ansible/roles/zookeeper/tasks/main.yml b/ansible/roles/zookeeper/tasks/main.yml
new file mode 100644
index 0000000000..0140e39bf2
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/main.yml
@@ -0,0 +1,10 @@
+---
+# This role will install kafka in group 'kafka' in the environment inventory
+# In deploy mode it will deploy kafka including zookeeper.
+# In clean mode it will remove kafka and zookeeper containers.
+
+- include: deploy.yml
+  when: mode == "deploy"
+
+- include: clean.yml
+  when: mode == "clean"
\ No newline at end of file
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index ecef131f90..8ae7b8c0b0 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -42,18 +42,16 @@ limits.triggers.fires.perMinute={{ limits.firesPerMinute }}
 limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}
 
 edge.host={{ groups["edge"]|first }}
-kafka.host={{ groups["kafka"]|first }}
+kafka.hosts={{ kafka_connect_string }}
 redis.host={{ groups["redis"]|default([""])|first }}
 router.host={{ groups["edge"]|first }}
-zookeeper.host={{ groups["kafka"]|first }}
+zookeeper.hosts={{ zookeeper_connect_string }}
 invoker.hosts={{ groups["invokers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
 
 edge.host.apiport=443
-zookeeper.host.port={{ zookeeper.port }}
-kafka.host.port={{ kafka.port }}
 kafkaras.host.port={{ kafka.ras.port }}
 redis.host.port={{ redis.port }}
-invoker.hosts.baseport={{ invoker.port }}
+invoker.hosts.basePort={{ invoker.port }}
 
 controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
 controller.host.basePort={{ controller.basePort }}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index fc6b593044..5c61cc6451 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,15 +41,15 @@ import whisk.core.connector.MessagingProvider
 object KafkaMessagingProvider extends MessagingProvider {
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging): MessageConsumer =
-    new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
 
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
-    new KafkaProducerConnector(config.kafkaHost, ec)
+    new KafkaProducerConnector(config.kafkaHosts, ec)
 
   def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
     implicit logging: Logging): Boolean = {
     val props = new Properties
-    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHost)
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
     val client = AdminClient.create(props)
     val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
     val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 8e8289d759..18b38249bd 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.errors.NotLeaderForPartitionException
 import org.apache.kafka.common.serialization.StringSerializer
 import whisk.common.Counter
 import whisk.common.Logging
@@ -36,7 +37,7 @@ import whisk.core.connector.Message
 import whisk.core.connector.MessageProducer
 import whisk.core.entity.UUIDs
 
-class KafkaProducerConnector(kafkahost: String,
+class KafkaProducerConnector(kafkahosts: String,
                              implicit val executionContext: ExecutionContext,
                              id: String = UUIDs.randomUUID().toString)(implicit logging: Logging)
     extends MessageProducer {
@@ -44,12 +45,13 @@ class KafkaProducerConnector(kafkahost: String,
   override def sentCount() = sentCounter.cur
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  override def send(topic: String, msg: Message): Future[RecordMetadata] = {
+  override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = {
     implicit val transid = msg.transid
     val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
 
     logging.debug(this, s"sending to topic '$topic' msg '$msg'")
     val produced = Promise[RecordMetadata]()
+
     producer.send(record, new Callback {
       override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
         if (exception == null) produced.success(metadata)
@@ -63,6 +65,13 @@ class KafkaProducerConnector(kafkahost: String,
         sentCounter.next()
       case Failure(t) =>
         logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
+    } recoverWith {
+      case t: NotLeaderForPartitionException =>
+        if (retry > 0) {
+          logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry")
+          Thread.sleep(100)
+          send(topic, msg, retry - 1)
+        } else produced.future
     }
   }
 
@@ -76,7 +85,7 @@ class KafkaProducerConnector(kafkahost: String,
 
   private def getProps: Properties = {
     val props = new Properties
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
     props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
     props
   }
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7a724d0568..c08fcf13f1 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -75,14 +75,14 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val controllerInstances = this(WhiskConfig.controllerInstances)
 
   val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
-  val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
+  val kafkaHosts = this(WhiskConfig.kafkaHostList)
   val redisHostName = this(WhiskConfig.redisHostName)
   val redisHostPort = this(WhiskConfig.redisHostPort)
 
   val edgeHostName = this(WhiskConfig.edgeHostName)
 
-  val zookeeperHost = this(WhiskConfig.zookeeperHostName) + ":" + this(WhiskConfig.zookeeperHostPort)
   val invokerHosts = this(WhiskConfig.invokerHostsList)
+  val zookeeperHosts = this(WhiskConfig.zookeeperHostList)
 
   val dbProvider = this(WhiskConfig.dbProvider)
   val dbUsername = this(WhiskConfig.dbUsername)
@@ -103,6 +103,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
   val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
   val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
+  val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
 
   val runtimesManifest = this(WhiskConfig.runtimesManifest)
   val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
@@ -224,20 +225,19 @@ object WhiskConfig {
 
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
-  val kafkaHostName = "kafka.host"
-  val zookeeperHostName = "zookeeper.host"
+  val kafkaHostList = "kafka.hosts"
+  val zookeeperHostList = "zookeeper.hosts"
   val redisHostName = "redis.host"
 
   private val edgeHostApiPort = "edge.host.apiport"
-  val kafkaHostPort = "kafka.host.port"
   val redisHostPort = "redis.host.port"
-  val zookeeperHostPort = "zookeeper.host.port"
 
   val invokerHostsList = "invoker.hosts"
 
   val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
   val invokerHosts = Map(invokerHostsList -> null)
-  val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null)
+  val kafkaHosts = Map(kafkaHostList -> null)
+  val zookeeperHosts = Map(zookeeperHostList -> null)
 
   val runtimesManifest = "runtimes.manifest"
 
@@ -247,6 +247,7 @@ object WhiskConfig {
   val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes"
   val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
   val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
+  val kafkaReplicationFactor = "kafka.replicationFactor"
 
   val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
   val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
index 53d8e2f429..de37c33917 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
@@ -27,7 +27,7 @@ trait MessageProducer {
   def sentCount(): Long
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  def send(topic: String, msg: Message): Future[RecordMetadata]
+  def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
 
   /** Closes producer. */
   def close(): Unit
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index d41a0dc1a8..80a6a90fba 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -217,7 +217,7 @@ object Controller {
           "completed" + instance,
           Map(
             "numPartitions" -> "1",
-            "replicationFactor" -> "1",
+            "replicationFactor" -> config.kafkaReplicationFactor,
             "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
             "retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
             "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index e38f69c9af..be3bb0b09e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -348,11 +348,12 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
 
 object LoadBalancerService {
   def requiredProperties =
-    kafkaHost ++
+    kafkaHosts ++
       Map(
         kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
         kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
-        kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString) ++
+        kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
+        kafkaReplicationFactor -> "1") ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
 
   /** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1c6853a96c..1a71e29379 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -61,12 +61,13 @@ object Invoker {
       ExecManifest.requiredProperties ++
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
-      kafkaHost ++
+      kafkaHosts ++
       Map(
         kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
         kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
-        kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString) ++
-      Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
+        kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
+        kafkaReplicationFactor -> "1") ++
+      zookeeperHosts ++
       wskApiHost ++ Map(
       dockerImageTag -> "latest",
       invokerNumCore -> "4",
@@ -135,17 +136,17 @@ object Invoker {
         id
       }
       .getOrElse {
-        if (config.zookeeperHost.startsWith(":") || config.zookeeperHost.endsWith(":")) {
-          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHost})")
+        if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
+          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
         }
         val invokerName = cmdLineArgs.name.getOrElse(config.invokerName)
         if (invokerName.trim.isEmpty) {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
 
-        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}")
+        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHosts}")
         val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
-        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy)
+        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHosts, retryPolicy)
         zkClient.start()
         zkClient.blockUntilConnected()
         logger.info(this, "invokerReg: connected to zookeeper")
@@ -192,7 +193,7 @@ object Invoker {
           "invoker" + assignedInvokerId,
           Map(
             "numPartitions" -> "1",
-            "replicationFactor" -> "1",
+            "replicationFactor" -> config.kafkaReplicationFactor,
             "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
             "retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
             "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 781f0bf91e..0971b23f91 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -132,12 +132,8 @@ public static String getProperty(String string) {
         return whiskProperties.getProperty(string);
     }
 
-    public static String getKafkaHost() {
-        return whiskProperties.getProperty("kafka.host");
-    }
-
-    public static int getKafkaPort() {
-        return Integer.parseInt(whiskProperties.getProperty("kafka.host.port"));
+    public static String getKafkaHosts() {
+        return whiskProperties.getProperty("kafka.hosts");
     }
 
     public static int getKafkaMonitorPort() {
@@ -145,11 +141,7 @@ public static int getKafkaMonitorPort() {
     }
 
     public static String getZookeeperHost() {
-        return whiskProperties.getProperty("zookeeper.host");
-    }
-
-    public static int getZookeeperPort() {
-        return Integer.parseInt(whiskProperties.getProperty("zookeeper.host.port"));
+        return whiskProperties.getProperty("zookeeper.hosts");
     }
 
     public static String getMainDockerEndpoint() {
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 20592a5c38..8974be3c79 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -17,43 +17,43 @@
 
 package services
 
+import java.io.File
 import java.util.Calendar
 
 import scala.concurrent.Await
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
 import scala.language.postfixOps
-
+import scala.util.Try
 import org.apache.kafka.clients.consumer.CommitFailedException
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
 import whisk.common.TransactionId
 import whisk.connector.kafka.KafkaConsumerConnector
 import whisk.connector.kafka.KafkaProducerConnector
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.utils.ExecutionContextFactory
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
   implicit val transid = TransactionId.testing
   implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
 
-  val config = new WhiskConfig(WhiskConfig.kafkaHost)
+  val config = new WhiskConfig(WhiskConfig.kafkaHosts)
   assert(config.isValid)
 
   val groupid = "kafkatest"
   val topic = "Dinosaurs"
   val sessionTimeout = 10 seconds
   val maxPollInterval = 10 seconds
-  val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+  val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
   val consumer = new KafkaConsumerConnector(
-    config.kafkaHost,
+    config.kafkaHosts,
     groupid,
     topic,
     sessionTimeout = sessionTimeout,
@@ -65,17 +65,34 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
     super.afterAll()
   }
 
+  def commandComponent(host: String, command: String, component: String) = {
+    def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+    val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
+    val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
+    val cmd = Seq(docker, "--host", host + ":" + dockerPort, command, component)
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def sendAndReceiveMessage(message: Message,
+                            waitForSend: FiniteDuration,
+                            waitForReceive: FiniteDuration): Iterable[String] = {
+    val start = java.lang.System.currentTimeMillis
+    val sent = Await.result(producer.send(topic, message), waitForSend)
+    val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+    val end = java.lang.System.currentTimeMillis
+    val elapsed = end - start
+    println(s"Received ${received.size}. Took $elapsed msec: $received\n")
+
+    received
+  }
+
   behavior of "Kafka connector"
 
   it should "send and receive a kafka message which sets up the topic" in {
     for (i <- 0 until 5) {
       val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-      val start = java.lang.System.currentTimeMillis
-      val sent = Await.result(producer.send(topic, message), 20 seconds)
-      val received = consumer.peek(10 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
-      val end = java.lang.System.currentTimeMillis
-      val elapsed = end - start
-      println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+      val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
       received.size should be >= 1
       received.last should be(message.serialize)
       consumer.commit()
@@ -85,12 +102,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   it should "send and receive a kafka message even after session timeout" in {
     for (i <- 0 until 4) {
       val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-      val start = java.lang.System.currentTimeMillis
-      val sent = Await.result(producer.send(topic, message), 1 seconds)
-      val received = consumer.peek(1 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
-      val end = java.lang.System.currentTimeMillis
-      val elapsed = end - start
-      println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+      val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
 
       // only the last iteration will have an updated cursor
       // iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
@@ -112,4 +124,32 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
       } else consumer.commit()
     }
   }
+
+  it should "send and receive a kafka message even after shutdown one of instances" in {
+    val kafkaHosts = config.kafkaHosts.split(",")
+    if (kafkaHosts.length > 1) {
+      for (i <- 0 until kafkaHosts.length) {
+        val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+        val kafkaHost = kafkaHosts(i).split(":")(0)
+        val startLog = s", started"
+        val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
+
+        commandComponent(kafkaHost, "stop", s"kafka$i")
+        var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+        received.size should be(1)
+        consumer.commit()
+
+        commandComponent(kafkaHost, "start", s"kafka$i")
+        retry({
+          startLog.r
+            .findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout)
+            .length shouldBe prevCount + 1
+        }, 20, Some(1.second)) // wait until kafka is up
+
+        received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+        received.size should be(1)
+        consumer.commit()
+      }
+    }
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 652dcb1542..e3b9597a3f 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -73,7 +73,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
   }
 
   private val producer = new MessageProducer {
-    def send(topic: String, msg: Message): Future[RecordMetadata] = {
+    def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
       queue.synchronized {
         if (queue.offer(msg)) {
           logging.info(this, s"put: $msg")
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index eaa288b5a8..b98b711e65 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -263,13 +263,13 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
   it should "be aborted when exceeding its memory limits" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
     val name = "TestNodeJsMemoryExceeding"
     assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
-      val allowedMemory = 256.megabytes
+      val allowedMemory = 128.megabytes
       val actionName = TestUtils.getTestActionFilename("memoryWithGC.js")
       (action, _) =>
         action.create(name, Some(actionName), memory = Some(allowedMemory))
     }
 
-    val run = wsk.action.invoke(name, Map("payload" -> 512.toJson))
+    val run = wsk.action.invoke(name, Map("payload" -> 256.toJson))
     withActivation(wsk.activation, run) {
       _.response.result.get.fields("error") shouldBe Messages.memoryExhausted.toJson
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services