You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/28 10:45:25 UTC
[GitHub] jeremiaswerner closed pull request #2744: Deploy kafka & zookeeper cluster with ansible
jeremiaswerner closed pull request #2744: Deploy kafka & zookeeper cluster with ansible
URL: https://github.com/apache/incubator-openwhisk/pull/2744
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/environments/distributed/hosts b/ansible/environments/distributed/hosts
index 0880e6d77c..9c9045b01e 100755
--- a/ansible/environments/distributed/hosts
+++ b/ansible/environments/distributed/hosts
@@ -18,9 +18,12 @@ edge
[controllers]
10.3.2.155 ansible_host=10.3.2.155
-[kafka]
+[kafkas]
10.3.2.156 ansible_host=10.3.2.156
+[zookeepers:children]
+kafkas
+
[invokers]
10.3.2.158 ansible_host=10.3.2.158
10.3.2.159 ansible_host=10.3.2.159
diff --git a/ansible/environments/docker-machine/hosts.j2.ini b/ansible/environments/docker-machine/hosts.j2.ini
index 4301576768..a4990ae359 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -10,9 +10,12 @@ ansible ansible_connection=local
controller0 ansible_host={{ docker_machine_ip }}
controller1 ansible_host={{ docker_machine_ip }}
-[kafka]
+[kafkas]
{{ docker_machine_ip }} ansible_host={{ docker_machine_ip }}
+[zookeepers:children]
+kafkas
+
[invokers]
invoker0 ansible_host={{ docker_machine_ip }}
invoker1 ansible_host={{ docker_machine_ip }}
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index 3860f33a2a..c3b3c12160 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -23,3 +23,12 @@ controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxre
invoker_arguments: "{{ controller_arguments }}"
invoker_allow_multiple_instances: true
+
+# Set kafka topic retention
+kafka_heap: '512m'
+kafka_topics_completed_retentionBytes: 104857600
+kafka_topics_completed_retentionMS: 300000
+kafka_topics_health_retentionBytes: 104857600
+kafka_topics_health_retentionMS: 300000
+kafka_topics_invoker_retentionBytes: 104857600
+kafka_topics_invoker_retentionMS: 300000
diff --git a/ansible/environments/local/hosts b/ansible/environments/local/hosts
index 00702f397d..516340861a 100644
--- a/ansible/environments/local/hosts
+++ b/ansible/environments/local/hosts
@@ -10,8 +10,12 @@ ansible ansible_connection=local
controller0 ansible_host=172.17.0.1 ansible_connection=local
controller1 ansible_host=172.17.0.1 ansible_connection=local
-[kafka]
-172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
+[kafkas]
+kafka0 ansible_host=172.17.0.1 ansible_connection=local
+kafka1 ansible_host=172.17.0.1 ansible_connection=local
+
+[zookeepers:children]
+kafkas
[invokers]
invoker0 ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 02c2b08ad1..f1a746d5b1 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -129,33 +129,46 @@ registry:
confdir: "{{ config_root_dir }}/registry"
kafka:
- version: 0.10.2.1
+ version: 0.11.0.1
port: 9092
ras:
- port: 9093
+ port: 8093
heap: "{{ kafka_heap | default('1g') }}"
+ replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}"
topics:
completed:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}"
- retentionMS: 3600000
+ retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}"
health:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}"
- retentionMS: 3600000
+ retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
invoker:
segmentBytes: 536870912
retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}"
- retentionMS: 172800000
+ retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}"
cacheInvalidation:
segmentBytes: 536870912
- retentionBytes: 1073741824
- retentionMS: 300000
+ retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}"
+ retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}"
+
+kafka_connect_string: "{% set ret = [] %}\
+ {% for host in groups['kafkas'] %}\
+ {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+ {% endfor %}\
+ {{ ret | join(',') }}"
zookeeper:
version: 3.4
port: 2181
+zookeeper_connect_string: "{% set ret = [] %}\
+ {% for host in groups['zookeepers'] %}\
+ {{ ret.append( hostvars[host].ansible_host + ':' + ((zookeeper.port+loop.index-1)|string) ) }}\
+ {% endfor %}\
+ {{ ret | join(',') }}"
+
invoker:
port: 12001
heap: "{{ invoker_heap | default('2g') }}"
diff --git a/ansible/kafka.yml b/ansible/kafka.yml
index 1b011b5ae3..fba52f2627 100644
--- a/ansible/kafka.yml
+++ b/ansible/kafka.yml
@@ -1,6 +1,10 @@
---
# This playbook deploys an Openwhisk Kafka bus.
-- hosts: kafka
+- hosts: zookeepers
+ roles:
+ - zookeeper
+
+- hosts: kafkas
roles:
- kafka
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3978316a07..2aa0b56b73 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -51,11 +51,11 @@
"WHISK_VERSION_DATE": "{{ whisk.version.date }}"
"WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
- "KAFKA_HOST": "{{ groups['kafka']|first }}"
- "KAFKA_HOST_PORT": "{{ kafka.port }}"
+ "KAFKA_HOSTS": "{{ kafka_connect_string }}"
"KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}"
"KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}"
"KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}"
+ "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
"DB_PROTOCOL": "{{ db_protocol }}"
"DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 94c06a3d2d..56b810015f 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -117,13 +117,12 @@
-e INVOKER_OPTS='{{ invoker.arguments }}'
-e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}'
-e PORT='8080'
- -e KAFKA_HOST='{{ groups['kafka']|first }}'
- -e KAFKA_HOST_PORT='{{ kafka.port }}'
+ -e KAFKA_HOSTS='{{ kafka_connect_string }}'
-e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}'
-e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}'
-e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}'
- -e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
- -e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
+ -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+ -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
-e DB_PROTOCOL='{{ db_protocol }}'
-e DB_PROVIDER='{{ db_provider }}'
-e DB_HOST='{{ db_host }}'
diff --git a/ansible/roles/kafka/tasks/clean.yml b/ansible/roles/kafka/tasks/clean.yml
index fa1db75d0f..b9d593381c 100644
--- a/ansible/roles/kafka/tasks/clean.yml
+++ b/ansible/roles/kafka/tasks/clean.yml
@@ -1,7 +1,7 @@
---
# Remove kafka and zookeeper containers.
-- name: remove kafka
+- name: remove old kafka
docker_container:
name: kafka
image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
@@ -9,10 +9,10 @@
state: absent
ignore_errors: True
-- name: remove zookeeper
+- name: remove kafka
docker_container:
- name: zookeeper
- image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+ name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+ image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
keep_volumes: False
state: absent
ignore_errors: True
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index f410cc3502..3d5845b164 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -1,57 +1,40 @@
---
# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
-- name: "pull the zookeeper:{{ zookeeper.version }} image"
- shell: "docker pull zookeeper:{{ zookeeper.version }}"
- retries: "{{ docker.pull.retries }}"
- delay: "{{ docker.pull.delay }}"
-
-- name: (re)start zookeeper
- docker_container:
- name: zookeeper
- image: zookeeper:{{ zookeeper.version }}
- state: started
- recreate: true
- restart_policy: "{{ docker.restart.policy }}"
- ports:
- - "{{ zookeeper.port }}:2181"
-
-- name: wait until the Zookeeper in this host is up and running
- action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port }}
- register: result
- until: (result.rc == 0) and (result.stdout == 'imok')
- retries: 36
- delay: 5
-
-- name: "pull the ches/kafka:{{ kafka.version }} image"
- shell: "docker pull ches/kafka:{{ kafka.version }}"
+- name: "pull the wurstmeister/kafka:{{ kafka.version }} image"
+ shell: "docker pull wurstmeister/kafka:{{ kafka.version }}"
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
- name: (re)start kafka
+ vars:
+ zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
docker_container:
- name: kafka
- image: ches/kafka:{{ kafka.version }}
+ name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
+ image: wurstmeister/kafka:{{ kafka.version }}
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
- links:
- - "zookeeper:zookeeper"
env:
+ "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+ "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+ "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+ "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+ "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
ports:
- - "{{ kafka.port }}:9092"
+ - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
- name: wait until the kafka server started up
- shell: docker logs kafka
+ shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
register: result
- until: ('[Kafka Server 0], started' in result.stdout)
+ until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout)
retries: 10
delay: 5
- name: create the health and the cacheInvalidation topic
- shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
+ shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'"
register: command_result
failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
changed_when: "'Created topic' in command_result.stdout"
@@ -60,3 +43,4 @@
settings: "{{ kafka.topics.health }}"
- name: cacheInvalidation
settings: "{{ kafka.topics.cacheInvalidation }}"
+ when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/ansible/roles/zookeeper/tasks/clean.yml b/ansible/roles/zookeeper/tasks/clean.yml
new file mode 100644
index 0000000000..df82ab100c
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/clean.yml
@@ -0,0 +1,18 @@
+---
+# Remove kafka and zookeeper containers.
+
+- name: remove old zookeeper
+ docker_container:
+ name: zookeeper
+ image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+ keep_volumes: False
+ state: absent
+ ignore_errors: True
+
+- name: remove zookeeper
+ docker_container:
+ name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
+ image: "{{ docker_registry }}{{ docker.image.prefix }}/zookeeper:{{ docker.image.tag }}"
+ keep_volumes: False
+ state: absent
+ ignore_errors: True
diff --git a/ansible/roles/zookeeper/tasks/deploy.yml b/ansible/roles/zookeeper/tasks/deploy.yml
new file mode 100644
index 0000000000..9d88911a36
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/deploy.yml
@@ -0,0 +1,37 @@
+---
+# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
+
+- name: "pull the zookeeper:{{ zookeeper.version }} image"
+ shell: "docker pull zookeeper:{{ zookeeper.version }}"
+ retries: "{{ docker.pull.retries }}"
+ delay: "{{ docker.pull.delay }}"
+
+- name: (re)start zookeeper
+ docker_container:
+ name: zookeeper{{ groups['zookeepers'].index(inventory_hostname) }}
+ image: zookeeper:{{ zookeeper.version }}
+ state: started
+ recreate: true
+ restart_policy: "{{ docker.restart.policy }}"
+ env:
+ ZOO_MY_ID: "{{ groups['zookeepers'].index(inventory_hostname) + 1 }}"
+ ZOO_SERVERS: "{% set zhosts = [] %}
+ {% for host in groups['zookeepers'] %}
+ {% if host == inventory_hostname %}
+ {{ zhosts.append('server.' + (loop.index|string) + '=' + '0.0.0.0:2888:3888') }}
+ {% else %}
+ {{ zhosts.append('server.' + (loop.index|string) + '=' + hostvars[host].ansible_host + ':' + ((2888+loop.index-1)|string) + ':' + ((3888+loop.index-1)|string) ) }}
+ {% endif %}
+ {% endfor %}
+ {{ zhosts | join(' ') }}"
+ ports:
+ - "{{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}:2181"
+ - "{{ 2888 + groups['zookeepers'].index(inventory_hostname) }}:2888"
+ - "{{ 3888 + groups['zookeepers'].index(inventory_hostname) }}:3888"
+
+- name: wait until the Zookeeper in this host is up and running
+ action: shell (echo ruok; sleep 1) | nc {{ ansible_host }} {{ zookeeper.port + groups['zookeepers'].index(inventory_hostname) }}
+ register: result
+ until: (result.rc == 0) and (result.stdout == 'imok')
+ retries: 36
+ delay: 5
diff --git a/ansible/roles/zookeeper/tasks/main.yml b/ansible/roles/zookeeper/tasks/main.yml
new file mode 100644
index 0000000000..0140e39bf2
--- /dev/null
+++ b/ansible/roles/zookeeper/tasks/main.yml
@@ -0,0 +1,10 @@
+---
+# This role will install kafka in group 'kafka' in the environment inventory
+# In deploy mode it will deploy kafka including zookeeper.
+# In clean mode it will remove kafka and zookeeper containers.
+
+- include: deploy.yml
+ when: mode == "deploy"
+
+- include: clean.yml
+ when: mode == "clean"
\ No newline at end of file
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index ecef131f90..8ae7b8c0b0 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -42,18 +42,16 @@ limits.triggers.fires.perMinute={{ limits.firesPerMinute }}
limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}
edge.host={{ groups["edge"]|first }}
-kafka.host={{ groups["kafka"]|first }}
+kafka.hosts={{ kafka_connect_string }}
redis.host={{ groups["redis"]|default([""])|first }}
router.host={{ groups["edge"]|first }}
-zookeeper.host={{ groups["kafka"]|first }}
+zookeeper.hosts={{ zookeeper_connect_string }}
invoker.hosts={{ groups["invokers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
edge.host.apiport=443
-zookeeper.host.port={{ zookeeper.port }}
-kafka.host.port={{ kafka.port }}
kafkaras.host.port={{ kafka.ras.port }}
redis.host.port={{ redis.port }}
-invoker.hosts.baseport={{ invoker.port }}
+invoker.hosts.basePort={{ invoker.port }}
controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
controller.host.basePort={{ controller.basePort }}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index fc6b593044..5c61cc6451 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,15 +41,15 @@ import whisk.core.connector.MessagingProvider
object KafkaMessagingProvider extends MessagingProvider {
def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging): MessageConsumer =
- new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+ new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
- new KafkaProducerConnector(config.kafkaHost, ec)
+ new KafkaProducerConnector(config.kafkaHosts, ec)
def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])(
implicit logging: Logging): Boolean = {
val props = new Properties
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHost)
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
val client = AdminClient.create(props)
val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 8e8289d759..18b38249bd 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.errors.NotLeaderForPartitionException
import org.apache.kafka.common.serialization.StringSerializer
import whisk.common.Counter
import whisk.common.Logging
@@ -36,7 +37,7 @@ import whisk.core.connector.Message
import whisk.core.connector.MessageProducer
import whisk.core.entity.UUIDs
-class KafkaProducerConnector(kafkahost: String,
+class KafkaProducerConnector(kafkahosts: String,
implicit val executionContext: ExecutionContext,
id: String = UUIDs.randomUUID().toString)(implicit logging: Logging)
extends MessageProducer {
@@ -44,12 +45,13 @@ class KafkaProducerConnector(kafkahost: String,
override def sentCount() = sentCounter.cur
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message): Future[RecordMetadata] = {
+ override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = {
implicit val transid = msg.transid
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
logging.debug(this, s"sending to topic '$topic' msg '$msg'")
val produced = Promise[RecordMetadata]()
+
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception == null) produced.success(metadata)
@@ -63,6 +65,13 @@ class KafkaProducerConnector(kafkahost: String,
sentCounter.next()
case Failure(t) =>
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
+ } recoverWith {
+ case t: NotLeaderForPartitionException =>
+ if (retry > 0) {
+ logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry")
+ Thread.sleep(100)
+ send(topic, msg, retry - 1)
+ } else produced.future
}
}
@@ -76,7 +85,7 @@ class KafkaProducerConnector(kafkahost: String,
private def getProps: Properties = {
val props = new Properties
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
props
}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7a724d0568..c08fcf13f1 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -75,14 +75,14 @@ class WhiskConfig(requiredProperties: Map[String, String],
val controllerInstances = this(WhiskConfig.controllerInstances)
val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
- val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
+ val kafkaHosts = this(WhiskConfig.kafkaHostList)
val redisHostName = this(WhiskConfig.redisHostName)
val redisHostPort = this(WhiskConfig.redisHostPort)
val edgeHostName = this(WhiskConfig.edgeHostName)
- val zookeeperHost = this(WhiskConfig.zookeeperHostName) + ":" + this(WhiskConfig.zookeeperHostPort)
val invokerHosts = this(WhiskConfig.invokerHostsList)
+ val zookeeperHosts = this(WhiskConfig.zookeeperHostList)
val dbProvider = this(WhiskConfig.dbProvider)
val dbUsername = this(WhiskConfig.dbUsername)
@@ -103,6 +103,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
+ val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
val runtimesManifest = this(WhiskConfig.runtimesManifest)
val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
@@ -224,20 +225,19 @@ object WhiskConfig {
val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
- val kafkaHostName = "kafka.host"
- val zookeeperHostName = "zookeeper.host"
+ val kafkaHostList = "kafka.hosts"
+ val zookeeperHostList = "zookeeper.hosts"
val redisHostName = "redis.host"
private val edgeHostApiPort = "edge.host.apiport"
- val kafkaHostPort = "kafka.host.port"
val redisHostPort = "redis.host.port"
- val zookeeperHostPort = "zookeeper.host.port"
val invokerHostsList = "invoker.hosts"
val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
val invokerHosts = Map(invokerHostsList -> null)
- val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null)
+ val kafkaHosts = Map(kafkaHostList -> null)
+ val zookeeperHosts = Map(zookeeperHostList -> null)
val runtimesManifest = "runtimes.manifest"
@@ -247,6 +247,7 @@ object WhiskConfig {
val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes"
val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
+ val kafkaReplicationFactor = "kafka.replicationFactor"
val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
index 53d8e2f429..de37c33917 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageProducer.scala
@@ -27,7 +27,7 @@ trait MessageProducer {
def sentCount(): Long
/** Sends msg to topic. This is an asynchronous operation. */
- def send(topic: String, msg: Message): Future[RecordMetadata]
+ def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
/** Closes producer. */
def close(): Unit
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index d41a0dc1a8..80a6a90fba 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -217,7 +217,7 @@ object Controller {
"completed" + instance,
Map(
"numPartitions" -> "1",
- "replicationFactor" -> "1",
+ "replicationFactor" -> config.kafkaReplicationFactor,
"retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
"retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
"segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index e38f69c9af..be3bb0b09e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -348,11 +348,12 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
object LoadBalancerService {
def requiredProperties =
- kafkaHost ++
+ kafkaHosts ++
Map(
kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
- kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString) ++
+ kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
+ kafkaReplicationFactor -> "1") ++
Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
/** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1c6853a96c..1a71e29379 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -61,12 +61,13 @@ object Invoker {
ExecManifest.requiredProperties ++
WhiskEntityStore.requiredProperties ++
WhiskActivationStore.requiredProperties ++
- kafkaHost ++
+ kafkaHosts ++
Map(
kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
- kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString) ++
- Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
+ kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
+ kafkaReplicationFactor -> "1") ++
+ zookeeperHosts ++
wskApiHost ++ Map(
dockerImageTag -> "latest",
invokerNumCore -> "4",
@@ -135,17 +136,17 @@ object Invoker {
id
}
.getOrElse {
- if (config.zookeeperHost.startsWith(":") || config.zookeeperHost.endsWith(":")) {
- abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHost})")
+ if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
+ abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
val invokerName = cmdLineArgs.name.getOrElse(config.invokerName)
if (invokerName.trim.isEmpty) {
abort("Invoker name can't be empty to use dynamicId assignment.")
}
- logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}")
+ logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHosts}")
val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
- val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy)
+ val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHosts, retryPolicy)
zkClient.start()
zkClient.blockUntilConnected()
logger.info(this, "invokerReg: connected to zookeeper")
@@ -192,7 +193,7 @@ object Invoker {
"invoker" + assignedInvokerId,
Map(
"numPartitions" -> "1",
- "replicationFactor" -> "1",
+ "replicationFactor" -> config.kafkaReplicationFactor,
"retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
"retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
"segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 781f0bf91e..0971b23f91 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -132,12 +132,8 @@ public static String getProperty(String string) {
return whiskProperties.getProperty(string);
}
- public static String getKafkaHost() {
- return whiskProperties.getProperty("kafka.host");
- }
-
- public static int getKafkaPort() {
- return Integer.parseInt(whiskProperties.getProperty("kafka.host.port"));
+ public static String getKafkaHosts() {
+ return whiskProperties.getProperty("kafka.hosts");
}
public static int getKafkaMonitorPort() {
@@ -145,11 +141,7 @@ public static int getKafkaMonitorPort() {
}
public static String getZookeeperHost() {
- return whiskProperties.getProperty("zookeeper.host");
- }
-
- public static int getZookeeperPort() {
- return Integer.parseInt(whiskProperties.getProperty("zookeeper.host.port"));
+ return whiskProperties.getProperty("zookeeper.hosts");
}
public static String getMainDockerEndpoint() {
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 20592a5c38..8974be3c79 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -17,43 +17,43 @@
package services
+import java.io.File
import java.util.Calendar
import scala.concurrent.Await
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.postfixOps
-
+import scala.util.Try
import org.apache.kafka.clients.consumer.CommitFailedException
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
import whisk.common.TransactionId
import whisk.connector.kafka.KafkaConsumerConnector
import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.connector.Message
import whisk.utils.ExecutionContextFactory
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
implicit val transid = TransactionId.testing
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
- val config = new WhiskConfig(WhiskConfig.kafkaHost)
+ val config = new WhiskConfig(WhiskConfig.kafkaHosts)
assert(config.isValid)
val groupid = "kafkatest"
val topic = "Dinosaurs"
val sessionTimeout = 10 seconds
val maxPollInterval = 10 seconds
- val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+ val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
val consumer = new KafkaConsumerConnector(
- config.kafkaHost,
+ config.kafkaHosts,
groupid,
topic,
sessionTimeout = sessionTimeout,
@@ -65,17 +65,34 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
super.afterAll()
}
+ def commandComponent(host: String, command: String, component: String) = {
+ def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+ val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
+ val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
+ val cmd = Seq(docker, "--host", host + ":" + dockerPort, command, component)
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+
+ def sendAndReceiveMessage(message: Message,
+ waitForSend: FiniteDuration,
+ waitForReceive: FiniteDuration): Iterable[String] = {
+ val start = java.lang.System.currentTimeMillis
+ val sent = Await.result(producer.send(topic, message), waitForSend)
+ val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+ val end = java.lang.System.currentTimeMillis
+ val elapsed = end - start
+ println(s"Received ${received.size}. Took $elapsed msec: $received\n")
+
+ received
+ }
+
behavior of "Kafka connector"
it should "send and receive a kafka message which sets up the topic" in {
for (i <- 0 until 5) {
val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
- val start = java.lang.System.currentTimeMillis
- val sent = Await.result(producer.send(topic, message), 20 seconds)
- val received = consumer.peek(10 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
- val end = java.lang.System.currentTimeMillis
- val elapsed = end - start
- println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+ val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
received.size should be >= 1
received.last should be(message.serialize)
consumer.commit()
@@ -85,12 +102,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
it should "send and receive a kafka message even after session timeout" in {
for (i <- 0 until 4) {
val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
- val start = java.lang.System.currentTimeMillis
- val sent = Await.result(producer.send(topic, message), 1 seconds)
- val received = consumer.peek(1 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
- val end = java.lang.System.currentTimeMillis
- val elapsed = end - start
- println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
+ val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
// only the last iteration will have an updated cursor
// iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
@@ -112,4 +124,32 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
} else consumer.commit()
}
}
+
+ it should "send and receive a kafka message even after shutdown one of instances" in {
+ val kafkaHosts = config.kafkaHosts.split(",")
+ if (kafkaHosts.length > 1) {
+ for (i <- 0 until kafkaHosts.length) {
+ val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+ val kafkaHost = kafkaHosts(i).split(":")(0)
+ val startLog = s", started"
+ val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
+
+ commandComponent(kafkaHost, "stop", s"kafka$i")
+ var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+ received.size should be(1)
+ consumer.commit()
+
+ commandComponent(kafkaHost, "start", s"kafka$i")
+ retry({
+ startLog.r
+ .findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout)
+ .length shouldBe prevCount + 1
+ }, 20, Some(1.second)) // wait until kafka is up
+
+ received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
+ received.size should be(1)
+ consumer.commit()
+ }
+ }
+ }
}
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 652dcb1542..e3b9597a3f 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -73,7 +73,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
}
private val producer = new MessageProducer {
- def send(topic: String, msg: Message): Future[RecordMetadata] = {
+ def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
queue.synchronized {
if (queue.offer(msg)) {
logging.info(this, s"put: $msg")
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index eaa288b5a8..b98b711e65 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -263,13 +263,13 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
it should "be aborted when exceeding its memory limits" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
val name = "TestNodeJsMemoryExceeding"
assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
- val allowedMemory = 256.megabytes
+ val allowedMemory = 128.megabytes
val actionName = TestUtils.getTestActionFilename("memoryWithGC.js")
(action, _) =>
action.create(name, Some(actionName), memory = Some(allowedMemory))
}
- val run = wsk.action.invoke(name, Map("payload" -> 512.toJson))
+ val run = wsk.action.invoke(name, Map("payload" -> 256.toJson))
withActivation(wsk.activation, run) {
_.response.result.get.fields("error") shouldBe Messages.memoryExhausted.toJson
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services