You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by je...@apache.org on 2017/11/28 10:45:24 UTC
[incubator-openwhisk] branch master updated: Deploy kafka &
zookeeper cluster with ansible (#2744)
This is an automated email from the ASF dual-hosted git repository.
jeremiaswerner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 21d37a1 Deploy kafka & zookeeper cluster with ansible (#2744)
21d37a1 is described below
commit 21d37a1af329544e245d376ad613d1a603fc104c
Author: Sang Heon Lee <de...@gmail.com>
AuthorDate: Tue Nov 28 19:45:21 2017 +0900
Deploy kafka & zookeeper cluster with ansible (#2744)
* Deploy kafka cluster with ansible
- Enable deploying zookeeper & kafka cluster in separate hosts
- use quorum number as default replicationFactor
- Add ansible task to remove legacy kafka&zookeeper container
- Add shutdown test for kafka cluster
- Reduce default kafka heap size to 0.5g
- Make kafkaReplicationFactor configurable
- Increase kafka timeout in test
- Wait until kafka is up in kafka shooting test
- Reduce memory usage in ActionLimitsTests
- Bump kafka broker version to 0.11.0.1
- Add sleep in kafka retry
---
ansible/environments/distributed/hosts | 5 +-
ansible/environments/docker-machine/hosts.j2.ini | 5 +-
ansible/environments/local/group_vars/all | 9 +++
ansible/environments/local/hosts | 8 ++-
ansible/group_vars/all | 27 ++++++--
ansible/kafka.yml | 6 +-
ansible/roles/controller/tasks/deploy.yml | 4 +-
ansible/roles/invoker/tasks/deploy.yml | 7 +-
ansible/roles/kafka/tasks/clean.yml | 8 +--
ansible/roles/kafka/tasks/deploy.yml | 48 +++++--------
ansible/roles/{kafka => zookeeper}/tasks/clean.yml | 8 +--
ansible/roles/zookeeper/tasks/deploy.yml | 37 ++++++++++
ansible/roles/zookeeper/tasks/main.yml | 10 +++
ansible/templates/whisk.properties.j2 | 8 +--
.../connector/kafka/KafkaMessagingProvider.scala | 6 +-
.../connector/kafka/KafkaProducerConnector.scala | 15 +++-
.../src/main/scala/whisk/core/WhiskConfig.scala | 15 ++--
.../whisk/core/connector/MessageProducer.scala | 2 +-
.../scala/whisk/core/controller/Controller.scala | 2 +-
.../core/loadBalancer/LoadBalancerService.scala | 5 +-
.../main/scala/whisk/core/invoker/Invoker.scala | 17 ++---
tests/src/test/scala/common/WhiskProperties.java | 14 +---
.../test/scala/services/KafkaConnectorTests.scala | 80 ++++++++++++++++------
.../whisk/core/connector/test/TestConnector.scala | 2 +-
.../whisk/core/limits/ActionLimitsTests.scala | 4 +-
25 files changed, 230 insertions(+), 122 deletions(-)
diff --git a/ansible/environments/distributed/hosts b/ansible/environments/distributed/hosts
index 0880e6d..9c9045b 100755
--- a/ansible/environments/distributed/hosts
+++ b/ansible/environments/distributed/hosts
@@ -18,9 +18,12 @@ edge
[controllers]
10.3.2.155 ansible_host=10.3.2.155
-[kafka]
+[kafkas]
10.3.2.156 ansible_host=10.3.2.156
+[zookeepers:children]
+kafkas
+
[invokers]
10.3.2.158 ansible_host=10.3.2.158
10.3.2.159 ansible_host=10.3.2.159
diff --git a/ansible/environments/docker-machine/hosts.j2.ini b/ansible/environments/docker-machine/hosts.j2.ini
index 4301576..a4990ae 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -10,9 +10,12 @@ ansible ansible_connection=local
controller0 ansible_host={{ docker_machine_ip }}
controller1 ansible_host={{ docker_machine_ip }}
-[kafka]
+[kafkas]
{{ docker_machine_ip }} ansible_host={{ docker_machine_ip }}
+[zookeepers:children]
+kafkas
+
[invokers]
invoker0 ansible_host={{ docker_machine_ip }}
invoker1 ansible_host={{ docker_machine_ip }}
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index 3860f33..c3b3c12 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -23,3 +23,12 @@ controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxre
invoker_arguments: "{{ controller_arguments }}"
invoker_allow_multiple_instances: true
+
+# Set kafka topic retention
+kafka_heap: '512m'
+kafka_topics_completed_retentionBytes: 104857600
+kafka_topics_completed_retentionMS: 300000
+kafka_topics_health_retentionBytes: 104857600
+kafka_topics_health_retentionMS: 300000
+kafka_topics_invoker_retentionBytes: 104857600
+kafka_topics_invoker_retentionMS: 300000
diff --git a/ansible/environments/local/hosts b/ansible/environments/local/hosts
index 00702f3..5163408 100644
--- a/ansible/environments/local/hosts
+++ b/ansible/environments/local/hosts
@@ -10,8 +10,12 @@ ansible ansible_connection=local
controller0 ansible_host=172.17.0.1 ansible_connection=local
controller1 ansible_host=172.17.0.1 ansible_connection=local
-[kafka]
-172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
+[kafkas]
+kafka0 ansible_host=172.17.0.1 ansible_connection=local
+kafka1 ansible_host=172.17.0.1 ansible_connection=local
+
+[zookeepers:children]
+kafkas
[invokers]
invoker0 ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 02c2b08..f1a746d 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -129,33 +129,46 @@ registry:
confdir: "{{ config_root_dir }}/registry"
kafka:
- version: 0.10.2.1
+ version: 0.11.0.1
port: 9092
ras:
- port: 9093
+ port: 8093
heap: "{{ kafka_heap | default('1g') }}"
+ replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}"
topics:
completed:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}"
- retentionMS: 3600000
+ retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}"
health:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}"
- retentionMS: 3600000
+ retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
invoker:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
- retentionMS: 172800000
+ retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}"
cacheInvalidation:
segmentBytes: 536870912
- retentionBytes: 1073741824
- retentionMS: 300000
+ retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}"
+ retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}"
+
+kafka_connect_string: "{% set ret = [] %}\
+ {% for host in groups['kafkas'] %}\
+ {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+ {% endfor %}\
+ {{ ret | join(',') }}"
zookeeper:
version: 3.4
port: 2181
+zookeeper_connect_string: "{% set ret = [] %}\
+ {% for host in groups['zookeepers'] %}\
+ {{ ret.append( hostvars[host].ansible_host + ':' + ((zookeeper.port+loop.index-1)|string) ) }}\
+ {% endfor %}\
+ {{ ret | join(',') }}"
+
invoker:
port: 12001
heap: "{{ invoker_heap | default('2g') }}"
diff --git a/ansible/kafka.yml b/ansible/kafka.yml
index 1b011b5..fba52f2 100644
--- a/ansible/kafka.yml
+++ b/ansible/kafka.yml
@@ -1,6 +1,10 @@
---
# This playbook deploys an Openwhisk Kafka bus.
-- hosts: kafka
+- hosts: zookeepers
+ roles:
+ - zookeeper
+
+- hosts: kafkas
roles:
- kafka
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3978316..2aa0b56 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -51,11 +51,11 @@
"WHISK_VERSION_DATE": "{{ whisk.version.date }}"
"WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
- "KAFKA_HOST": "{{ groups['kafka']|first }}"
- "KAFKA_HOST_PORT": "{{ kafka.port }}"
+ "KAFKA_HOSTS": "{{ kafka_connect_string }}"
"KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}"
"KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}"
"KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}"
+ "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
"DB_PROTOCOL": "{{ db_protocol }}"
"DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 94c06a3..56b8100 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -117,13 +117,12 @@
-e INVOKER_OPTS='{{ invoker.arguments }}'
-e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}'
-e PORT='8080'
- -e KAFKA_HOST='{{ groups['kafka']|first }}'
- -e KAFKA_HOST_PORT='{{ kafka.port }}'
+ -e KAFKA_HOSTS='{{ kafka_connect_string }}'
-e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}'
-e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}'
-e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}'
- -e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
- -e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
+ -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+ -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
-e DB_PROTOCOL='{{ db_protocol }}'
-e DB_PROVIDER='{{ db_provider }}'
-e DB_HOST='{{ db_host }}'
diff --git a/ansible/roles/kafka/tasks/clean.yml b/ansible/roles/kafka/tasks/clean.yml
index fa1db75..b9d5933 100644
--- a/ansible/roles/kafka/tasks/clean.yml
+++ b/ansible/roles/kafka/tasks/clean.yml
@@ -1,7 +1,7 @@
---
# Remove kafka and zookeeper containers.
-- name: remove kafka
+- name: remove old kafka
docker_container:
name: kafka
image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
@@ -9,10 +9,10 @@
state: absent
ignore_errors: True
-- name: remove zookeeper
+- name: remove kafka
docker_container:
- name: zookeeper
- image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+ name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+ image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
keep_volumes: False
state: absent
ignore_errors: True
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index f410cc3..3d5845b 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -1,57 +1,40 @@
---
# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
-- name: "pull the zookeeper:{{ zookeeper.version }} image"
- shell: "docker pull zookeeper:{{ zookeeper.version }}"
- retries: "{{ docker.pull.retries }}"
- delay: "{{ docker.pull.delay }}"
-
-- name: (re)start zookeeper
- docker_container:
- name: zookeeper
- image: zookeeper:{{ zookeeper.version }}
- state: started
- recreate: true
- restart_policy: "{{ docker.restart.policy }}"
- ports:
- - "{{ zookeeper.port }}:2181"
-
-- name: wait until the Zookeeper in this host is up and running
- action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port }}
- register: result
- until: (result.rc == 0) and (result.stdout == 'imok')
- retries: 36
- delay: 5
-
-- name: "pull the ches/kafka:{{ kafka.version }} image"
- shell: "docker pull ches/kafka:{{ kafka.version }}"
+- name: "pull the wurstmeister/kafka:{{ kafka.version }} image"
+ shell: "docker pull wurstmeister/kafka:{{ kafka.version }}"
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
- name: (re)start kafka
+ vars:
+ zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
docker_container:
- name: kafka
- image: ches/kafka:{{ kafka.version }}
+ name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+ image: wurstmeister/kafka:{{ kafka.version }}
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
- links:
- - "zookeeper:zookeeper"
env:
+ "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+ "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+ "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+ "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+ "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
ports:
- - "{{ kafka.port }}:9092"
+ - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
- name: wait until the kafka server started up
- shell: docker logs kafka
+ shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
register: result
- until: ('[Kafka Server 0], started' in result.stdout)
+ until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout)
retries: 10
delay: 5
- name: create the health and the cacheInvalidation topic
- shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
+ shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
register: command_result
failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
changed_when: "'Created topic' in command_result.stdout"
@@ -60,3 +43,4 @@
settings: "{{ kafka.topics.health }}"
- name: cacheInvalidation
settings: "{{ kafka.topics.cacheInvalidation }}"
+ when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/ansible/roles/kafka/tasks/clean.yml b/ansible/roles/zookeeper/tasks/clean.yml
similarity index 64%
copy from ansible/roles/kafka/tasks/clean.yml
copy to ansible/roles/zookeeper/tasks/clean.yml
index fa1db75..df82ab1 100644
--- a/ansible/roles/kafka/tasks/clean.yml
+++ b/ansible/roles/zookeeper/tasks/clean.yml
@@ -1,17 +1,17 @@
---
# Remove kafka and zookeeper containers.
-- name: remove kafka
+- name: remove old zookeeper
docker_container:
- name: kafka
- image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
+ name: zookeeper
+ image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
keep_volumes: False
state: absent
ignore_errors: True
- name: remove zookeeper
docker_container:
- name: zookeeper
+ name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
keep_volumes: False
state: absent
diff --git a/ansible/roles/zookeeper/tasks/deploy.yml b/ansible/roles/zookeeper/tasks/deploy.yml
new file mode 100644
index 0000000..9d88911
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/deploy.yml
@@ -0,0 +1,37 @@
+---
+# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
+
+- name: "pull the zookeeper:{{ zookeeper.version }} image"
+ shell: "docker pull zookeeper:{{ zookeeper.version }}"
+ retries: "{{ docker.pull.retries }}"
+ delay: "{{ docker.pull.delay }}"
+
+- name: (re)start zookeeper
+ docker_container:
+ name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
+ image: zookeeper:{{ zookeeper.version }}
+ state: started
+ recreate: true
+ restart_policy: "{{ docker.restart.policy }}"
+ env:
+ ZOO_MY_ID: "{{ groups['zookeepers'].index(inventory_hostname) + 1 }}"
+ ZOO_SERVERS: "{% set zhosts = [] %}
+ {% for host in groups['zookeepers'] %}
+ {% if host == inventory_hostname %}
+ {{ zhosts.append('server.' + (loop.index|string) + '=' + '0.0.0.0:2888:3888') }}
+ {% else %}
+ {{ zhosts.append('server.' + (loop.index|string) + '=' + hostvars[host].ansible_host + ':' + ((2888+loop.index-1)|string) + ':' + ((3888+loop.index-1)|string) ) }}
+ {% endif %}
+ {% endfor %}
+ {{ zhosts | join(' ') }}"
+ ports:
+ - "{{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}:2181"
+ - "{{ 2888 + groups['zookeepers'].index(inventory_hostname) }}:2888"
+ - "{{ 3888 + groups['zookeepers'].index(inventory_hostname) }}:3888"
+
+- name: wait until the Zookeeper in this host is up and running
+ action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}
+ register: result
+ until: (result.rc == 0) and (result.stdout == 'imok')
+ retries: 36
+ delay: 5
diff --git a/ansible/roles/zookeeper/tasks/main.yml b/ansible/roles/zookeeper/tasks/main.yml
new file mode 100644
index 0000000..0140e39
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/main.yml
@@ -0,0 +1,10 @@
+---
+# This role will install kafka in group 'kafka' in the environment inventory
+# In deploy mode it will deploy kafka including zookeeper.
+# In clean mode it will remove kafka and zookeeper containers.
+
+- include: deploy.yml
+ when: mode == "deploy"
+
+- include: clean.yml
+ when: mode == "clean"
\ No newline at end of file
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index ecef131..8ae7b8c 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -42,18 +42,16 @@ limits.triggers.fires.perMinute={{ limits.firesPerMinute }}
limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}
edge.host={{ groups["edge"]|first }}
-kafka.host={{ groups["kafka"]|first }}
+kafka.hosts={{ kafka_connect_string }}
redis.host={{ groups["redis"]|default([""])|first }}
router.host={{ groups["edge"]|first }}
-zookeeper.host={{ groups["kafka"]|first }}
+zookeeper.hosts={{ zookeeper_connect_string }}
invoker.hosts={{ groups["invokers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
edge.host.apiport=443
-zookeeper.host.port={{ zookeeper.port }}
-kafka.host.port={{ kafka.port }}
kafkaras.host.port={{ kafka.ras.port }}
redis.host.port={{ redis.port }}
-invoker.hosts.baseport={{ invoker.port }}
+invoker.hosts.basePort={{ invoker.port }}
controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
controller.host.basePort={{ controller.basePort }}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index fc6b593..5c61cc6 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,15 +41,15 @@ import whisk.core.connector.MessagingProvider
object KafkaMessagingProvider extends MessagingProvider {
def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging): MessageConsumer =
- new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+ new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
- new KafkaProducerConnector(config.kafkaHost, ec)
+ new KafkaProducerConnector(config.kafkaHosts, ec)
def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
implicit logging: Logging): Boolean = {
val props = new Properties
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHost)
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
val client = AdminClient.create(props)
val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 8e8289d..18b3824 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.errors.NotLeaderForPartitionException
import org.apache.kafka.common.serialization.StringSerializer
import whisk.common.Counter
import whisk.common.Logging
@@ -36,7 +37,7 @@ import whisk.core.connector.Message
import whisk.core.connector.MessageProducer
import whisk.core.entity.UUIDs
-class KafkaProducerConnector(kafkahost: String,
+class KafkaProducerConnector(kafkahosts: String,
implicit val executionContext: ExecutionContext,
id: String = UUIDs.randomUUID().toString)(implicit logging: Logging)
extends MessageProducer {
@@ -44,12 +45,13 @@ class KafkaProducerConnector(kafkahost: String,
override def sentCount() = sentCounter.cur
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message): Future[RecordMetadata] = {
+ override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = {
implicit val transid = msg.transid
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
logging.debug(this, s"sending to topic '$topic' msg '$msg'")
val produced = Promise[RecordMetadata]()
+
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception == null) produced.success(metadata)
@@ -63,6 +65,13 @@ class KafkaProducerConnector(kafkahost: String,
sentCounter.next()
case Failure(t) =>
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
+ } recoverWith {
+ case t: NotLeaderForPartitionException =>
+ if (retry > 0) {
+ logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry")
+ Thread.sleep(100)
+ send(topic, msg, retry - 1)
+ } else produced.future
}
}
@@ -76,7 +85,7 @@ class KafkaProducerConnector(kafkahost: String,
private def getProps: Properties = {
val props = new Properties
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
props
}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7a724d0..c08fcf1 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -75,14 +75,14 @@ class WhiskConfig(requiredProperties: Map[String, String],
val controllerInstances = this(WhiskConfig.controllerInstances)
val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
- val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
+ val kafkaHosts = this(WhiskConfig.kafkaHostList)
val redisHostName = this(WhiskConfig.redisHostName)
val redisHostPort = this(WhiskConfig.redisHostPort)
val edgeHostName = this(WhiskConfig.edgeHostName)
- val zookeeperHost = this(WhiskConfig.zookeeperHostName) + ":" + this(WhiskConfig.zookeeperHostPort)
val invokerHosts = this(WhiskConfig.invokerHostsList)
+ val zookeeperHosts = this(WhiskConfig.zookeeperHostList)
val dbProvider = this(WhiskConfig.dbProvider)
val dbUsername = this(WhiskConfig.dbUsername)
@@ -103,6 +103,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
+ val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
val runtimesManifest = this(WhiskConfig.runtimesManifest)
val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
@@ -224,20 +225,19 @@ object WhiskConfig {
val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
- val kafkaHostName = "kafka.host"
- val zookeeperHostName = "zookeeper.host"
+ val kafkaHostList = "kafka.hosts"
+ val zookeeperHostList = "zookeeper.hosts"
val redisHostName = "redis.host"
private val edgeHostApiPort = "edge.host.apiport"
- val kafkaHostPort = "kafka.host.port"
val redisHostPort = "redis.host.port"
- val zookeeperHostPort = "zookeeper.host.port"
val invokerHostsList = "invoker.hosts"
val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
val invokerHosts = Map(invokerHostsList -> null)
- val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null)
+ val kafkaHosts = Map(kafkaHostList -> null)
+ val zookeeperHosts = Map(zookeeperHostList -> null)
val runtimesManifest = "runtimes.manifest"
@@ -247,6 +247,7 @@ object WhiskConfig {
val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes"
val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
+ val kafkaReplicationFactor = "kafka.replicationFactor"
val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
index 53d8e2f..de37c33 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
@@ -27,7 +27,7 @@ trait MessageProducer {
def sentCount(): Long
/** Sends msg to topic. This is an asynchronous operation. */
- def send(topic: String, msg: Message): Future[RecordMetadata]
+ def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
/** Closes producer. */
def close(): Unit
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index d41a0dc..80a6a90 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -217,7 +217,7 @@ object Controller {
"completed" + instance,
Map(
"numPartitions" -> "1",
- "replicationFactor" -> "1",
+ "replicationFactor" -> config.kafkaReplicationFactor,
"retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
"retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
"segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index e38f69c..be3bb0b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -348,11 +348,12 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
object LoadBalancerService {
def requiredProperties =
- kafkaHost ++
+ kafkaHosts ++
Map(
kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
- kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString) ++
+ kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
+ kafkaReplicationFactor -> "1") ++
Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
/** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1c6853a..1a71e29 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -61,12 +61,13 @@ object Invoker {
ExecManifest.requiredProperties ++
WhiskEntityStore.requiredProperties ++
WhiskActivationStore.requiredProperties ++
- kafkaHost ++
+ kafkaHosts ++
Map(
kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
- kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString) ++
- Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
+ kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
+ kafkaReplicationFactor -> "1") ++
+ zookeeperHosts ++
wskApiHost ++ Map(
dockerImageTag -> "latest",
invokerNumCore -> "4",
@@ -135,17 +136,17 @@ object Invoker {
id
}
.getOrElse {
- if (config.zookeeperHost.startsWith(":") || config.zookeeperHost.endsWith(":")) {
- abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHost})")
+ if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
+ abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
val invokerName = cmdLineArgs.name.getOrElse(config.invokerName)
if (invokerName.trim.isEmpty) {
abort("Invoker name can't be empty to use dynamicId assignment.")
}
- logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}")
+ logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHosts}")
val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
- val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy)
+ val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHosts, retryPolicy)
zkClient.start()
zkClient.blockUntilConnected()
logger.info(this, "invokerReg: connected to zookeeper")
@@ -192,7 +193,7 @@ object Invoker {
"invoker" + assignedInvokerId,
Map(
"numPartitions" -> "1",
- "replicationFactor" -> "1",
+ "replicationFactor" -> config.kafkaReplicationFactor,
"retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
"retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
"segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 781f0bf..0971b23 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -132,12 +132,8 @@ public class WhiskProperties {
return whiskProperties.getProperty(string);
}
- public static String getKafkaHost() {
- return whiskProperties.getProperty("kafka.host");
- }
-
- public static int getKafkaPort() {
- return Integer.parseInt(whiskProperties.getProperty("kafka.host.port"));
+ public static String getKafkaHosts() {
+ return whiskProperties.getProperty("kafka.hosts");
}
public static int getKafkaMonitorPort() {
@@ -145,11 +141,7 @@ public class WhiskProperties {
}
public static String getZookeeperHost() {
- return whiskProperties.getProperty("zookeeper.host");
- }
-
- public static int getZookeeperPort() {
- return Integer.parseInt(whiskProperties.getProperty("zookeeper.host.port"));
+ return whiskProperties.getProperty("zookeeper.hosts");
}
public static String getMainDockerEndpoint() {
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 20592a5..8974be3 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -17,43 +17,43 @@
package services
+import java.io.File
import java.util.Calendar
import scala.concurrent.Await
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.postfixOps
-
+import scala.util.Try
import org.apache.kafka.clients.consumer.CommitFailedException
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
import whisk.common.TransactionId
import whisk.connector.kafka.KafkaConsumerConnector
import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.connector.Message
import whisk.utils.ExecutionContextFactory
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
implicit val transid = TransactionId.testing
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
- val config = new WhiskConfig(WhiskConfig.kafkaHost)
+ val config = new WhiskConfig(WhiskConfig.kafkaHosts)
assert(config.isValid)
val groupid = "kafkatest"
val topic = "Dinosaurs"
val sessionTimeout = 10 seconds
val maxPollInterval = 10 seconds
- val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+ val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
val consumer = new KafkaConsumerConnector(
- config.kafkaHost,
+ config.kafkaHosts,
groupid,
topic,
sessionTimeout = sessionTimeout,
@@ -65,17 +65,34 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
super.afterAll()
}
+ def commandComponent(host: String, command: String, component: String) = {
+ def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+ val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
+ val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
+ val cmd = Seq(docker, "--host", host + ":" + dockerPort, command, component)
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+
+ def sendAndReceiveMessage(message: Message,
+ waitForSend: FiniteDuration,
+ waitForReceive: FiniteDuration): Iterable[String] = {
+ val start = java.lang.System.currentTimeMillis
+ val sent = Await.result(producer.send(topic, message), waitForSend)
+ val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+ val end = java.lang.System.currentTimeMillis
+ val elapsed = end - start
+ println(s"Received ${received.size}. Took $elapsed msec: $received\n")
+
+ received
+ }
+
behavior of "Kafka connector"
it should "send and receive a kafka message which sets up the topic" in {
for (i <- 0 until 5) {
val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
- val start = java.lang.System.currentTimeMillis
- val sent = Await.result(producer.send(topic, message), 20 seconds)
- val received = consumer.peek(10 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
- val end = java.lang.System.currentTimeMillis
- val elapsed = end - start
- println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+ val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
received.size should be >= 1
received.last should be(message.serialize)
consumer.commit()
@@ -85,12 +102,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
it should "send and receive a kafka message even after session timeout" in {
for (i <- 0 until 4) {
val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
- val start = java.lang.System.currentTimeMillis
- val sent = Await.result(producer.send(topic, message), 1 seconds)
- val received = consumer.peek(1 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
- val end = java.lang.System.currentTimeMillis
- val elapsed = end - start
- println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+ val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
// only the last iteration will have an updated cursor
// iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
@@ -112,4 +124,32 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
} else consumer.commit()
}
}
+
+ it should "send and receive a kafka message even after shutdown one of instances" in {
+ val kafkaHosts = config.kafkaHosts.split(",")
+ if (kafkaHosts.length > 1) {
+ for (i <- 0 until kafkaHosts.length) {
+ val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+ val kafkaHost = kafkaHosts(i).split(":")(0)
+ val startLog = s", started"
+ val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
+
+ commandComponent(kafkaHost, "stop", s"kafka$i")
+ var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+ received.size should be(1)
+ consumer.commit()
+
+ commandComponent(kafkaHost, "start", s"kafka$i")
+ retry({
+ startLog.r
+ .findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout)
+ .length shouldBe prevCount + 1
+ }, 20, Some(1.second)) // wait until kafka is up
+
+ received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+ received.size should be(1)
+ consumer.commit()
+ }
+ }
+ }
}
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 652dcb1..e3b9597 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -73,7 +73,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
}
private val producer = new MessageProducer {
- def send(topic: String, msg: Message): Future[RecordMetadata] = {
+ def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
queue.synchronized {
if (queue.offer(msg)) {
logging.info(this, s"put: $msg")
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index eaa288b..b98b711 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -263,13 +263,13 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
it should "be aborted when exceeding its memory limits" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
val name = "TestNodeJsMemoryExceeding"
assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
- val allowedMemory = 256.megabytes
+ val allowedMemory = 128.megabytes
val actionName = TestUtils.getTestActionFilename("memoryWithGC.js")
(action, _) =>
action.create(name, Some(actionName), memory = Some(allowedMemory))
}
- val run = wsk.action.invoke(name, Map("payload" -> 512.toJson))
+ val run = wsk.action.invoke(name, Map("payload" -> 256.toJson))
withActivation(wsk.activation, run) {
_.response.result.get.fields("error") shouldBe Messages.memoryExhausted.toJson
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].