You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by je...@apache.org on 2017/11/28 10:45:24 UTC

[incubator-openwhisk] branch master updated: Deploy kafka & zookeeper cluster with ansible (#2744)

This is an automated email from the ASF dual-hosted git repository.

jeremiaswerner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 21d37a1  Deploy kafka & zookeeper cluster with ansible (#2744)
21d37a1 is described below

commit 21d37a1af329544e245d376ad613d1a603fc104c
Author: Sang Heon Lee <de...@gmail.com>
AuthorDate: Tue Nov 28 19:45:21 2017 +0900

    Deploy kafka & zookeeper cluster with ansible (#2744)
    
    * Deploy kafka cluster with ansible
    
    - Enable deploying zookeeper & kafka cluster in separate hosts
    - use quorum number as default replicationFactor
    - Add ansible task to remove legacy kafka&zookeeper container
    - Add shutdown test for kafka cluster
    - Reduce default kafka heap size to 0.5g
    - Make kafkaReplicationFactor configurable
    - Increase kafka timeout in test
    - Wait until kafka is up in kafka shooting test
    - Reduce memory usage in ActionLimitsTests
    - Bump kafka broker version to 0.11.0.1
    - Add sleep in kafka retry
---
 ansible/environments/distributed/hosts             |  5 +-
 ansible/environments/docker-machine/hosts.j2.ini   |  5 +-
 ansible/environments/local/group_vars/all          |  9 +++
 ansible/environments/local/hosts                   |  8 ++-
 ansible/group_vars/all                             | 27 ++++++--
 ansible/kafka.yml                                  |  6 +-
 ansible/roles/controller/tasks/deploy.yml          |  4 +-
 ansible/roles/invoker/tasks/deploy.yml             |  7 +-
 ansible/roles/kafka/tasks/clean.yml                |  8 +--
 ansible/roles/kafka/tasks/deploy.yml               | 48 +++++--------
 ansible/roles/{kafka => zookeeper}/tasks/clean.yml |  8 +--
 ansible/roles/zookeeper/tasks/deploy.yml           | 37 ++++++++++
 ansible/roles/zookeeper/tasks/main.yml             | 10 +++
 ansible/templates/whisk.properties.j2              |  8 +--
 .../connector/kafka/KafkaMessagingProvider.scala   |  6 +-
 .../connector/kafka/KafkaProducerConnector.scala   | 15 +++-
 .../src/main/scala/whisk/core/WhiskConfig.scala    | 15 ++--
 .../whisk/core/connector/MessageProducer.scala     |  2 +-
 .../scala/whisk/core/controller/Controller.scala   |  2 +-
 .../core/loadBalancer/LoadBalancerService.scala    |  5 +-
 .../main/scala/whisk/core/invoker/Invoker.scala    | 17 ++---
 tests/src/test/scala/common/WhiskProperties.java   | 14 +---
 .../test/scala/services/KafkaConnectorTests.scala  | 80 ++++++++++++++++------
 .../whisk/core/connector/test/TestConnector.scala  |  2 +-
 .../whisk/core/limits/ActionLimitsTests.scala      |  4 +-
 25 files changed, 230 insertions(+), 122 deletions(-)

diff --git a/ansible/environments/distributed/hosts b/ansible/environments/distributed/hosts
index 0880e6d..9c9045b 100755
--- a/ansible/environments/distributed/hosts
+++ b/ansible/environments/distributed/hosts
@@ -18,9 +18,12 @@ edge
 [controllers]
 10.3.2.155              ansible_host=10.3.2.155
 
-[kafka]
+[kafkas]
 10.3.2.156              ansible_host=10.3.2.156
 
+[zookeepers:children]
+kafkas
+
 [invokers]
 10.3.2.158              ansible_host=10.3.2.158
 10.3.2.159              ansible_host=10.3.2.159
diff --git a/ansible/environments/docker-machine/hosts.j2.ini b/ansible/environments/docker-machine/hosts.j2.ini
index 4301576..a4990ae 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -10,9 +10,12 @@ ansible ansible_connection=local
 controller0                 ansible_host={{ docker_machine_ip }}
 controller1                 ansible_host={{ docker_machine_ip }}
 
-[kafka]
+[kafkas]
 {{ docker_machine_ip }}     ansible_host={{ docker_machine_ip }}
 
+[zookeepers:children]
+kafkas
+
 [invokers]
 invoker0                    ansible_host={{ docker_machine_ip }}
 invoker1                    ansible_host={{ docker_machine_ip }}
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index 3860f33..c3b3c12 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -23,3 +23,12 @@ controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxre
 invoker_arguments: "{{ controller_arguments }}"
 
 invoker_allow_multiple_instances: true
+
+# Set kafka topic retention
+kafka_heap: '512m'
+kafka_topics_completed_retentionBytes: 104857600
+kafka_topics_completed_retentionMS: 300000
+kafka_topics_health_retentionBytes: 104857600
+kafka_topics_health_retentionMS: 300000
+kafka_topics_invoker_retentionBytes: 104857600
+kafka_topics_invoker_retentionMS: 300000
diff --git a/ansible/environments/local/hosts b/ansible/environments/local/hosts
index 00702f3..5163408 100644
--- a/ansible/environments/local/hosts
+++ b/ansible/environments/local/hosts
@@ -10,8 +10,12 @@ ansible ansible_connection=local
 controller0         ansible_host=172.17.0.1 ansible_connection=local
 controller1         ansible_host=172.17.0.1 ansible_connection=local
 
-[kafka]
-172.17.0.1          ansible_host=172.17.0.1 ansible_connection=local
+[kafkas]
+kafka0              ansible_host=172.17.0.1 ansible_connection=local
+kafka1              ansible_host=172.17.0.1 ansible_connection=local
+
+[zookeepers:children]
+kafkas
 
 [invokers]
 invoker0            ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 02c2b08..f1a746d 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -129,33 +129,46 @@ registry:
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
-  version: 0.10.2.1
+  version: 0.11.0.1
   port: 9092
   ras:
-    port: 9093
+    port: 8093
   heap: "{{ kafka_heap | default('1g') }}"
+  replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}"
   topics:
     completed:
       segmentBytes: 536870912
       retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}"
-      retentionMS: 3600000
+      retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}"
     health:
       segmentBytes: 536870912
       retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}"
-      retentionMS: 3600000
+      retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
     invoker:
       segmentBytes: 536870912
       retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
-      retentionMS: 172800000
+      retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}"
     cacheInvalidation:
       segmentBytes: 536870912
-      retentionBytes: 1073741824
-      retentionMS: 300000
+      retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}"
+      retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}" 
+
+kafka_connect_string: "{% set ret = [] %}\
+                       {% for host in groups['kafkas'] %}\
+                         {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+                       {% endfor %}\
+                       {{ ret | join(',') }}"
 
 zookeeper:
   version: 3.4
   port: 2181
 
+zookeeper_connect_string: "{% set ret = [] %}\
+                           {% for host in groups['zookeepers'] %}\
+                             {{ ret.append( hostvars[host].ansible_host + ':' + ((zookeeper.port+loop.index-1)|string) ) }}\
+                           {% endfor %}\
+                           {{ ret | join(',') }}"
+
 invoker:
   port: 12001
   heap: "{{ invoker_heap | default('2g') }}"
diff --git a/ansible/kafka.yml b/ansible/kafka.yml
index 1b011b5..fba52f2 100644
--- a/ansible/kafka.yml
+++ b/ansible/kafka.yml
@@ -1,6 +1,10 @@
 ---
 # This playbook deploys an Openwhisk Kafka bus.  
 
-- hosts: kafka
+- hosts: zookeepers
+  roles:
+  - zookeeper
+
+- hosts: kafkas
   roles:
   - kafka
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3978316..2aa0b56 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -51,11 +51,11 @@
       "WHISK_VERSION_DATE": "{{ whisk.version.date }}"
       "WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
 
-      "KAFKA_HOST": "{{ groups['kafka']|first }}"
-      "KAFKA_HOST_PORT": "{{ kafka.port }}"
+      "KAFKA_HOSTS": "{{ kafka_connect_string }}"
       "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}"
       "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}"
       "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}"
+      "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
 
       "DB_PROTOCOL": "{{ db_protocol }}"
       "DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 94c06a3..56b8100 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -117,13 +117,12 @@
         -e INVOKER_OPTS='{{ invoker.arguments }}'
         -e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}'
         -e PORT='8080'
-        -e KAFKA_HOST='{{ groups['kafka']|first }}'
-        -e KAFKA_HOST_PORT='{{ kafka.port }}'
+        -e KAFKA_HOSTS='{{ kafka_connect_string }}'
         -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}'
         -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}'
         -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}'
-        -e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
-        -e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
+        -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+        -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
         -e DB_HOST='{{ db_host }}'
diff --git a/ansible/roles/kafka/tasks/clean.yml b/ansible/roles/kafka/tasks/clean.yml
index fa1db75..b9d5933 100644
--- a/ansible/roles/kafka/tasks/clean.yml
+++ b/ansible/roles/kafka/tasks/clean.yml
@@ -1,7 +1,7 @@
 ---
 # Remove kafka and zookeeper containers.
 
-- name: remove kafka
+- name: remove old kafka
   docker_container:
     name: kafka
     image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
@@ -9,10 +9,10 @@
     state: absent
   ignore_errors: True
 
-- name: remove zookeeper
+- name: remove kafka
   docker_container:
-    name: zookeeper
-    image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+    name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
     keep_volumes: False
     state: absent
   ignore_errors: True
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index f410cc3..3d5845b 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -1,57 +1,40 @@
 ---
 # This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
 
-- name: "pull the zookeeper:{{ zookeeper.version }} image"
-  shell: "docker pull zookeeper:{{ zookeeper.version }}"
-  retries: "{{ docker.pull.retries }}"
-  delay: "{{ docker.pull.delay }}"
-
-- name: (re)start zookeeper
-  docker_container:
-    name: zookeeper
-    image: zookeeper:{{ zookeeper.version }}
-    state: started
-    recreate: true
-    restart_policy: "{{ docker.restart.policy }}"
-    ports:
-      - "{{ zookeeper.port }}:2181"
-
-- name: wait until the Zookeeper in this host is up and running
-  action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port }}
-  register: result
-  until: (result.rc == 0) and (result.stdout == 'imok')
-  retries: 36
-  delay: 5
-
-- name: "pull the ches/kafka:{{ kafka.version }} image"
-  shell: "docker pull ches/kafka:{{ kafka.version }}"
+- name: "pull the wurstmeister/kafka:{{ kafka.version }} image"
+  shell: "docker pull wurstmeister/kafka:{{ kafka.version }}"
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
 - name: (re)start kafka
+  vars:
+    zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
   docker_container:
-    name: kafka
-    image: ches/kafka:{{ kafka.version }}
+    name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+    image: wurstmeister/kafka:{{ kafka.version }}
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
-    links:
-      - "zookeeper:zookeeper"
     env:
+      "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+      "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+      "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+      "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
     ports:
-      - "{{ kafka.port }}:9092"
+      - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
 
 - name: wait until the kafka server started up
-  shell: docker logs kafka
+  shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
   register: result
-  until: ('[Kafka Server 0], started' in result.stdout)
+  until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout)
   retries: 10
   delay: 5
 
 - name: create the health and the cacheInvalidation topic
-  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
+  shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
   register: command_result
   failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
   changed_when: "'Created topic' in command_result.stdout"
@@ -60,3 +43,4 @@
     settings: "{{ kafka.topics.health }}"
   - name: cacheInvalidation
     settings: "{{ kafka.topics.cacheInvalidation }}"
+  when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/ansible/roles/kafka/tasks/clean.yml b/ansible/roles/zookeeper/tasks/clean.yml
similarity index 64%
copy from ansible/roles/kafka/tasks/clean.yml
copy to ansible/roles/zookeeper/tasks/clean.yml
index fa1db75..df82ab1 100644
--- a/ansible/roles/kafka/tasks/clean.yml
+++ b/ansible/roles/zookeeper/tasks/clean.yml
@@ -1,17 +1,17 @@
 ---
 # Remove kafka and zookeeper containers.
 
-- name: remove kafka
+- name: remove old zookeeper
   docker_container:
-    name: kafka
-    image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
+    name: zookeeper
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
     keep_volumes: False
     state: absent
   ignore_errors: True
 
 - name: remove zookeeper
   docker_container:
-    name: zookeeper
+    name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
     image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
     keep_volumes: False
     state: absent
diff --git a/ansible/roles/zookeeper/tasks/deploy.yml b/ansible/roles/zookeeper/tasks/deploy.yml
new file mode 100644
index 0000000..9d88911
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/deploy.yml
@@ -0,0 +1,37 @@
+---
+# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
+
+- name: "pull the zookeeper:{{ zookeeper.version }} image"
+  shell: "docker pull zookeeper:{{ zookeeper.version }}"
+  retries: "{{ docker.pull.retries }}"
+  delay: "{{ docker.pull.delay }}"
+
+- name: (re)start zookeeper
+  docker_container:
+    name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
+    image: zookeeper:{{ zookeeper.version }}
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    env:
+        ZOO_MY_ID: "{{ groups['zookeepers'].index(inventory_hostname) + 1 }}"
+        ZOO_SERVERS: "{% set zhosts = [] %}
+                      {% for host in groups['zookeepers'] %}
+                        {% if host == inventory_hostname %}
+                          {{ zhosts.append('server.' + (loop.index|string) + '=' + '0.0.0.0:2888:3888') }}
+                        {% else %}
+                          {{ zhosts.append('server.' + (loop.index|string) + '=' + hostvars[host].ansible_host + ':' + ((2888+loop.index-1)|string) + ':' + ((3888+loop.index-1)|string) ) }}
+                        {% endif %}
+                      {% endfor %}
+                      {{ zhosts | join(' ') }}"
+    ports:
+      - "{{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}:2181"
+      - "{{ 2888 + groups['zookeepers'].index(inventory_hostname) }}:2888"
+      - "{{ 3888 + groups['zookeepers'].index(inventory_hostname) }}:3888"
+
+- name: wait until the Zookeeper in this host is up and running
+  action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}
+  register: result
+  until: (result.rc == 0) and (result.stdout == 'imok')
+  retries: 36
+  delay: 5
diff --git a/ansible/roles/zookeeper/tasks/main.yml b/ansible/roles/zookeeper/tasks/main.yml
new file mode 100644
index 0000000..0140e39
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/main.yml
@@ -0,0 +1,10 @@
+---
+# This role will install kafka in group 'kafka' in the environment inventory
+# In deploy mode it will deploy kafka including zookeeper.
+# In clean mode it will remove kafka and zookeeper containers.
+
+- include: deploy.yml
+  when: mode == "deploy"
+
+- include: clean.yml
+  when: mode == "clean"
\ No newline at end of file
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index ecef131..8ae7b8c 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -42,18 +42,16 @@ limits.triggers.fires.perMinute={{ limits.firesPerMinute }}
 limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}
 
 edge.host={{ groups["edge"]|first }}
-kafka.host={{ groups["kafka"]|first }}
+kafka.hosts={{ kafka_connect_string }}
 redis.host={{ groups["redis"]|default([""])|first }}
 router.host={{ groups["edge"]|first }}
-zookeeper.host={{ groups["kafka"]|first }}
+zookeeper.hosts={{ zookeeper_connect_string }}
 invoker.hosts={{ groups["invokers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
 
 edge.host.apiport=443
-zookeeper.host.port={{ zookeeper.port }}
-kafka.host.port={{ kafka.port }}
 kafkaras.host.port={{ kafka.ras.port }}
 redis.host.port={{ redis.port }}
-invoker.hosts.baseport={{ invoker.port }}
+invoker.hosts.basePort={{ invoker.port }}
 
 controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
 controller.host.basePort={{ controller.basePort }}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index fc6b593..5c61cc6 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,15 +41,15 @@ import whisk.core.connector.MessagingProvider
 object KafkaMessagingProvider extends MessagingProvider {
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging): MessageConsumer =
-    new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
 
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
-    new KafkaProducerConnector(config.kafkaHost, ec)
+    new KafkaProducerConnector(config.kafkaHosts, ec)
 
   def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
     implicit logging: Logging): Boolean = {
     val props = new Properties
-    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHost)
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
     val client = AdminClient.create(props)
     val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
     val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 8e8289d..18b3824 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.errors.NotLeaderForPartitionException
 import org.apache.kafka.common.serialization.StringSerializer
 import whisk.common.Counter
 import whisk.common.Logging
@@ -36,7 +37,7 @@ import whisk.core.connector.Message
 import whisk.core.connector.MessageProducer
 import whisk.core.entity.UUIDs
 
-class KafkaProducerConnector(kafkahost: String,
+class KafkaProducerConnector(kafkahosts: String,
                              implicit val executionContext: ExecutionContext,
                              id: String = UUIDs.randomUUID().toString)(implicit logging: Logging)
     extends MessageProducer {
@@ -44,12 +45,13 @@ class KafkaProducerConnector(kafkahost: String,
   override def sentCount() = sentCounter.cur
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  override def send(topic: String, msg: Message): Future[RecordMetadata] = {
+  override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = {
     implicit val transid = msg.transid
     val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
 
     logging.debug(this, s"sending to topic '$topic' msg '$msg'")
     val produced = Promise[RecordMetadata]()
+
     producer.send(record, new Callback {
       override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
         if (exception == null) produced.success(metadata)
@@ -63,6 +65,13 @@ class KafkaProducerConnector(kafkahost: String,
         sentCounter.next()
       case Failure(t) =>
         logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
+    } recoverWith {
+      case t: NotLeaderForPartitionException =>
+        if (retry > 0) {
+          logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry")
+          Thread.sleep(100)
+          send(topic, msg, retry - 1)
+        } else produced.future
     }
   }
 
@@ -76,7 +85,7 @@ class KafkaProducerConnector(kafkahost: String,
 
   private def getProps: Properties = {
     val props = new Properties
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
     props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
     props
   }
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7a724d0..c08fcf1 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -75,14 +75,14 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val controllerInstances = this(WhiskConfig.controllerInstances)
 
   val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
-  val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
+  val kafkaHosts = this(WhiskConfig.kafkaHostList)
   val redisHostName = this(WhiskConfig.redisHostName)
   val redisHostPort = this(WhiskConfig.redisHostPort)
 
   val edgeHostName = this(WhiskConfig.edgeHostName)
 
-  val zookeeperHost = this(WhiskConfig.zookeeperHostName) + ":" + this(WhiskConfig.zookeeperHostPort)
   val invokerHosts = this(WhiskConfig.invokerHostsList)
+  val zookeeperHosts = this(WhiskConfig.zookeeperHostList)
 
   val dbProvider = this(WhiskConfig.dbProvider)
   val dbUsername = this(WhiskConfig.dbUsername)
@@ -103,6 +103,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
   val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
   val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
+  val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
 
   val runtimesManifest = this(WhiskConfig.runtimesManifest)
   val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
@@ -224,20 +225,19 @@ object WhiskConfig {
 
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
-  val kafkaHostName = "kafka.host"
-  val zookeeperHostName = "zookeeper.host"
+  val kafkaHostList = "kafka.hosts"
+  val zookeeperHostList = "zookeeper.hosts"
   val redisHostName = "redis.host"
 
   private val edgeHostApiPort = "edge.host.apiport"
-  val kafkaHostPort = "kafka.host.port"
   val redisHostPort = "redis.host.port"
-  val zookeeperHostPort = "zookeeper.host.port"
 
   val invokerHostsList = "invoker.hosts"
 
   val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
   val invokerHosts = Map(invokerHostsList -> null)
-  val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null)
+  val kafkaHosts = Map(kafkaHostList -> null)
+  val zookeeperHosts = Map(zookeeperHostList -> null)
 
   val runtimesManifest = "runtimes.manifest"
 
@@ -247,6 +247,7 @@ object WhiskConfig {
   val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes"
   val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
   val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
+  val kafkaReplicationFactor = "kafka.replicationFactor"
 
   val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
   val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
index 53d8e2f..de37c33 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
@@ -27,7 +27,7 @@ trait MessageProducer {
   def sentCount(): Long
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  def send(topic: String, msg: Message): Future[RecordMetadata]
+  def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
 
   /** Closes producer. */
   def close(): Unit
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index d41a0dc..80a6a90 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -217,7 +217,7 @@ object Controller {
           "completed" + instance,
           Map(
             "numPartitions" -> "1",
-            "replicationFactor" -> "1",
+            "replicationFactor" -> config.kafkaReplicationFactor,
             "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
             "retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
             "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index e38f69c..be3bb0b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -348,11 +348,12 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
 
 object LoadBalancerService {
   def requiredProperties =
-    kafkaHost ++
+    kafkaHosts ++
       Map(
         kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
         kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
-        kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString) ++
+        kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
+        kafkaReplicationFactor -> "1") ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
 
   /** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1c6853a..1a71e29 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -61,12 +61,13 @@ object Invoker {
       ExecManifest.requiredProperties ++
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
-      kafkaHost ++
+      kafkaHosts ++
       Map(
         kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
         kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
-        kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString) ++
-      Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
+        kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
+        kafkaReplicationFactor -> "1") ++
+      zookeeperHosts ++
       wskApiHost ++ Map(
       dockerImageTag -> "latest",
       invokerNumCore -> "4",
@@ -135,17 +136,17 @@ object Invoker {
         id
       }
       .getOrElse {
-        if (config.zookeeperHost.startsWith(":") || config.zookeeperHost.endsWith(":")) {
-          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHost})")
+        if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
+          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
         }
         val invokerName = cmdLineArgs.name.getOrElse(config.invokerName)
         if (invokerName.trim.isEmpty) {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
 
-        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}")
+        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHosts}")
         val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
-        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy)
+        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHosts, retryPolicy)
         zkClient.start()
         zkClient.blockUntilConnected()
         logger.info(this, "invokerReg: connected to zookeeper")
@@ -192,7 +193,7 @@ object Invoker {
           "invoker" + assignedInvokerId,
           Map(
             "numPartitions" -> "1",
-            "replicationFactor" -> "1",
+            "replicationFactor" -> config.kafkaReplicationFactor,
             "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
             "retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
             "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 781f0bf..0971b23 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -132,12 +132,8 @@ public class WhiskProperties {
         return whiskProperties.getProperty(string);
     }
 
-    public static String getKafkaHost() {
-        return whiskProperties.getProperty("kafka.host");
-    }
-
-    public static int getKafkaPort() {
-        return Integer.parseInt(whiskProperties.getProperty("kafka.host.port"));
+    public static String getKafkaHosts() {
+        return whiskProperties.getProperty("kafka.hosts");
     }
 
     public static int getKafkaMonitorPort() {
@@ -145,11 +141,7 @@ public class WhiskProperties {
     }
 
     public static String getZookeeperHost() {
-        return whiskProperties.getProperty("zookeeper.host");
-    }
-
-    public static int getZookeeperPort() {
-        return Integer.parseInt(whiskProperties.getProperty("zookeeper.host.port"));
+        return whiskProperties.getProperty("zookeeper.hosts");
     }
 
     public static String getMainDockerEndpoint() {
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 20592a5..8974be3 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -17,43 +17,43 @@
 
 package services
 
+import java.io.File
 import java.util.Calendar
 
 import scala.concurrent.Await
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
 import scala.language.postfixOps
-
+import scala.util.Try
 import org.apache.kafka.clients.consumer.CommitFailedException
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
 import whisk.common.TransactionId
 import whisk.connector.kafka.KafkaConsumerConnector
 import whisk.connector.kafka.KafkaProducerConnector
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.utils.ExecutionContextFactory
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
   implicit val transid = TransactionId.testing
   implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
 
-  val config = new WhiskConfig(WhiskConfig.kafkaHost)
+  val config = new WhiskConfig(WhiskConfig.kafkaHosts)
   assert(config.isValid)
 
   val groupid = "kafkatest"
   val topic = "Dinosaurs"
   val sessionTimeout = 10 seconds
   val maxPollInterval = 10 seconds
-  val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+  val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
   val consumer = new KafkaConsumerConnector(
-    config.kafkaHost,
+    config.kafkaHosts,
     groupid,
     topic,
     sessionTimeout = sessionTimeout,
@@ -65,17 +65,34 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
     super.afterAll()
   }
 
+  def commandComponent(host: String, command: String, component: String) = {
+    def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+    val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
+    val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
+    val cmd = Seq(docker, "--host", host + ":" + dockerPort, command, component)
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def sendAndReceiveMessage(message: Message,
+                            waitForSend: FiniteDuration,
+                            waitForReceive: FiniteDuration): Iterable[String] = {
+    val start = java.lang.System.currentTimeMillis
+    val sent = Await.result(producer.send(topic, message), waitForSend)
+    val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+    val end = java.lang.System.currentTimeMillis
+    val elapsed = end - start
+    println(s"Received ${received.size}. Took $elapsed msec: $received\n")
+
+    received
+  }
+
   behavior of "Kafka connector"
 
   it should "send and receive a kafka message which sets up the topic" in {
     for (i <- 0 until 5) {
       val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-      val start = java.lang.System.currentTimeMillis
-      val sent = Await.result(producer.send(topic, message), 20 seconds)
-      val received = consumer.peek(10 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
-      val end = java.lang.System.currentTimeMillis
-      val elapsed = end - start
-      println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+      val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
       received.size should be >= 1
       received.last should be(message.serialize)
       consumer.commit()
@@ -85,12 +102,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   it should "send and receive a kafka message even after session timeout" in {
     for (i <- 0 until 4) {
       val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-      val start = java.lang.System.currentTimeMillis
-      val sent = Await.result(producer.send(topic, message), 1 seconds)
-      val received = consumer.peek(1 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
-      val end = java.lang.System.currentTimeMillis
-      val elapsed = end - start
-      println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+      val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
 
       // only the last iteration will have an updated cursor
       // iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
@@ -112,4 +124,32 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
       } else consumer.commit()
     }
   }
+
+  it should "send and receive a kafka message even after shutdown one of instances" in {
+    val kafkaHosts = config.kafkaHosts.split(",")
+    if (kafkaHosts.length > 1) {
+      for (i <- 0 until kafkaHosts.length) {
+        val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+        val kafkaHost = kafkaHosts(i).split(":")(0)
+        val startLog = s", started"
+        val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
+
+        commandComponent(kafkaHost, "stop", s"kafka$i")
+        var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+        received.size should be(1)
+        consumer.commit()
+
+        commandComponent(kafkaHost, "start", s"kafka$i")
+        retry({
+          startLog.r
+            .findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout)
+            .length shouldBe prevCount + 1
+        }, 20, Some(1.second)) // wait until kafka is up
+
+        received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+        received.size should be(1)
+        consumer.commit()
+      }
+    }
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 652dcb1..e3b9597 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -73,7 +73,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
   }
 
   private val producer = new MessageProducer {
-    def send(topic: String, msg: Message): Future[RecordMetadata] = {
+    def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
       queue.synchronized {
         if (queue.offer(msg)) {
           logging.info(this, s"put: $msg")
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index eaa288b..b98b711 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -263,13 +263,13 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
   it should "be aborted when exceeding its memory limits" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
     val name = "TestNodeJsMemoryExceeding"
     assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
-      val allowedMemory = 256.megabytes
+      val allowedMemory = 128.megabytes
       val actionName = TestUtils.getTestActionFilename("memoryWithGC.js")
       (action, _) =>
         action.create(name, Some(actionName), memory = Some(allowedMemory))
     }
 
-    val run = wsk.action.invoke(name, Map("payload" -> 512.toJson))
+    val run = wsk.action.invoke(name, Map("payload" -> 256.toJson))
     withActivation(wsk.activation, run) {
       _.response.result.get.fields("error") shouldBe Messages.memoryExhausted.toJson
     }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].