You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/07 19:19:48 UTC

[GitHub] rabbah closed pull request #2545: Kafka topics are created via Docker Images

rabbah closed pull request #2545: Kafka topics are created via Docker Images
URL: https://github.com/apache/incubator-openwhisk/pull/2545
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index dd59815a55..15fe0d9b8b 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -111,7 +111,8 @@ registry:
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
-  version: 0.10.2.1
+  replication_factor: "{{ kafka_replication_factor | default(1) }}"
+  partitions: "{{ kafka_partitions | default(1) }}"
   port: 9092
   ras:
     port: 9093
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index ec2f66dbd8..a33bfcced2 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -57,11 +57,19 @@
       "LOADBALANCER_INVOKERBUSYTHRESHOLD": "{{ invoker.busyThreshold }}"
 
       "RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
+
+      "REPLICATION_FACTOR": "{{ kafka.replication_factor}}"
+      "PARTITIONS": "{{ kafka.partitions }}"
+      "ZOOKEEPER_HOST": "{{ groups['kafka']|first }}"
+      "ZOOKEEPER_PORT": "{{ zookeeper.port }}"
+      "KAFKA_TOPICS_COMPLETED_RETENTIONBYTES": "{{ kafka.topics.completed.retentionBytes }}"
+      "KAFKA_TOPICS_COMPLETED_RETENTIONMS": "{{ kafka.topics.completed.retentionMS }}"
+      "KAFKA_TOPICS_COMPLETED_SEGMENTBYTES": "{{ kafka.topics.completed.segmentBytes }}"
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
       - "{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}:8080"
-    command: /bin/sh -c "controller/bin/controller {{ groups['controllers'].index(inventory_hostname) }} >> /logs/controller{{ groups['controllers'].index(inventory_hostname) }}_logs.log 2>&1"
+    command: /bin/sh -c "/controller.sh {{ groups['controllers'].index(inventory_hostname) }} >> /logs/controller{{ groups['controllers'].index(inventory_hostname) }}_logs.log 2>&1"
 
 - name: wait until the Controller in this host is up and running
   uri:
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 04258abd03..99493513c8 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -96,6 +96,13 @@
         -e INVOKER_NUMCORE='{{ invoker.numcore }}'
         -e INVOKER_CORESHARE='{{ invoker.coreshare }}'
         -e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
+        -e REPLICATION_FACTOR={{ kafka.replication_factor}}
+        -e PARTITIONS={{ kafka.partitions }}
+        -e ZOOKEEPER_HOST={{ groups['kafka']|first }}
+        -e ZOOKEEPER_PORT={{ zookeeper.port }}
+        -e KAFKA_TOPICS_INVOKER_RETENTIONBYTES={{ kafka.topics.invoker.retentionBytes }}
+        -e KAFKA_TOPICS_INVOKER_RETENTIONMS={{ kafka.topics.invoker.retentionMS }}
+        -e KAFKA_TOPICS_INVOKER_SEGMENTBYTES={{ kafka.topics.invoker.segmentBytes }}
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
         -v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
@@ -103,7 +110,8 @@
         -v {{ docker_sock | default('/var/run/docker.sock') }}:/var/run/docker.sock
         -p {{ invoker.port + groups['invokers'].index(inventory_hostname) }}:8080
         {{ docker_registry }}{{ docker.image.prefix }}/invoker:{{ docker.image.tag }}
-        /bin/sh -c "exec /invoker/bin/invoker {{ groups['invokers'].index(inventory_hostname) }} >> /logs/invoker{{ groups['invokers'].index(inventory_hostname) }}_logs.log 2>&1"
+        /bin/sh -c "/invoker.sh {{ groups['invokers'].index(inventory_hostname) }} >> /logs/invoker{{ groups['invokers'].index(inventory_hostname) }}_logs.log 2>&1"
+  when: invokerInfo.json|length == 0
 
 # todo: re-enable docker_container module once https://github.com/ansible/ansible-modules-core/issues/5054 is resolved
 
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index 615c496bde..63a58f2950 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -23,15 +23,17 @@
   retries: 36
   delay: 5
 
-- name: "pull the ches/kafka:{{ kafka.version }} image"
-  shell: "docker pull ches/kafka:{{ kafka.version }}"
+
+- name: "pull the {{ docker.image.tag }} image of kafka"
+  shell: "docker pull {{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
+  when: docker_registry != ""
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
 - name: (re)start kafka
   docker_container:
     name: kafka
-    image: ches/kafka:{{ kafka.version }}
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/kafka:{{ docker.image.tag }}"
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
@@ -40,35 +42,19 @@
     env:
       "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
       "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+      "REPLICATION_FACTOR": "{{ kafka.replication_factor}}"
+      "PARTITIONS": "{{ kafka.partitions }}"
+      "ZOOKEEPER_HOST": "{{ groups['kafka']|first }}"
+      "ZOOKEEPER_PORT": "{{ zookeeper.port }}"
+      "KAFKA_TOPICS_HEALTH_RETENTIONBYTES": "{{ kafka.topics.health.retentionBytes }}"
+      "KAFKA_TOPICS_HEALTH_RETENTIONMS": "{{ kafka.topics.health.retentionMS }}"
+      "KAFKA_TOPICS_HEALTH_SEGMENTBYTES": "{{ kafka.topics.health.segmentBytes }}"
     ports:
       - "{{ kafka.port }}:9092"
 
 - name: wait until the kafka server started up
   shell: docker logs kafka
   register: result
-  until: ('[Kafka Server 0], started' in result.stdout)
+  until: ('Created Health topics and Kafka is running' in result.stdout)
   retries: 10
   delay: 5
-
-- name: create the health topic
-  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'"
-  register: command_result
-  failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
-  changed_when: "'Created topic' in command_result.stdout"
-  with_items:
-  - health
-  - cacheInvalidation
-
-- name: create the active-ack topics
-  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic completed{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.completed.retentionBytes }} --config retention.ms={{ kafka.topics.completed.retentionMS }} --config segment.bytes={{ kafka.topics.completed.segmentBytes }}'"
-  with_indexed_items: "{{ groups['controllers'] }}"
-  register: command_result
-  failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
-  changed_when: "'Created topic' in command_result.stdout"
-
-- name: create the invoker topics
-  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic invoker{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.invoker.retentionBytes }} --config retention.ms={{ kafka.topics.invoker.retentionMS }} --config segment.bytes={{ kafka.topics.invoker.segmentBytes }}'"
-  with_indexed_items: "{{ groups['invokers'] }}"
-  register: command_result
-  failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
-  changed_when: "'Created topic' in command_result.stdout"
diff --git a/core/controller/Dockerfile b/core/controller/Dockerfile
index ecfc0ecaaa..51c61cb202 100644
--- a/core/controller/Dockerfile
+++ b/core/controller/Dockerfile
@@ -1,5 +1,9 @@
 FROM scala
 
+# define kafka version and release version to download
+ARG KAFKA_VERSION
+ARG KAFKA_RELEASE
+
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install swagger-ui
@@ -15,3 +19,16 @@ COPY build/distributions/controller.tar ./
 RUN tar xf controller.tar
 
 EXPOSE 8080
+
+# Download Kafka
+RUN wget --no-verbose -P /tmp http://www.us.apache.org/dist/kafka/${KAFKA_VERSION}/${KAFKA_RELEASE}
+
+# untar Kafka to /kafka
+RUN mkdir /kafka
+RUN tar -xf /tmp/${KAFKA_RELEASE} -C /kafka --strip=1
+
+# remove Kafka tarball
+RUN rm /tmp/${KAFKA_RELEASE}
+
+# Copy the start script
+COPY controller.sh /controller.sh
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index c4abec3657..31d90e9063 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -3,6 +3,7 @@ apply plugin: 'application'
 apply plugin: 'eclipse'
 
 ext.dockerImageName = 'controller'
+apply from: '../../gradle/settings/kafka.gradle'
 apply from: '../../gradle/docker.gradle'
 distDocker.dependsOn ':common:scala:distDocker', 'distTar'
 
diff --git a/core/controller/controller.sh b/core/controller/controller.sh
new file mode 100755
index 0000000000..295803691e
--- /dev/null
+++ b/core/controller/controller.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+set -x
+
+if [ -z "$1" ]; then
+  echo "Controller index not passed as first argument to startup script"
+  exit 1
+fi
+
+CONTROLLER_INDEX=$1
+
+echo "Create controller topics"
+OUTPUT=$(/kafka/bin/kafka-topics.sh --create --topic completed$CONTROLLER_INDEX --replication-factor $REPLICATION_FACTOR --partitions $PARTITIONS --zookeeper ${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT} --config retention.bytes=$KAFKA_TOPICS_COMPLETED_RETENTIONBYTES --config retention.ms=$KAFKA_TOPICS_COMPLETED_RETENTIONMS --config segment.bytes=$KAFKA_TOPICS_COMPLETED_SEGMENTBYTES)
+
+if ! ([[ "$OUTPUT" == *"already exists"* ]] || [[ "$OUTPUT" == *"Created topic"* ]]); then
+  echo "Failed to create invoker$i topic"
+  exit 1
+fi
+
+# Start command is defined in deployment option, Ansible, Kubernetes, etc
+exec /controller/bin/controller $CONTROLLER_INDEX
diff --git a/core/invoker/Dockerfile b/core/invoker/Dockerfile
index 3a7a53fef7..f7afb197b3 100644
--- a/core/invoker/Dockerfile
+++ b/core/invoker/Dockerfile
@@ -1,5 +1,9 @@
 FROM scala
 
+# define kafka version and release version to download
+ARG KAFKA_VERSION
+ARG KAFKA_RELEASE
+
 ENV DOCKER_VERSION 1.12.0
 
 # Uncomment to fetch latest version of docker instead: RUN wget -qO- https://get.docker.com | sh
@@ -11,8 +15,22 @@ rm -f docker-${DOCKER_VERSION}.tgz && \
 chmod +x /usr/bin/docker && \
 chmod +x /usr/bin/docker-runc
 
+# Copy the Invoker built from gradle
 COPY build/distributions/invoker.tar ./
 RUN tar xf invoker.tar && \
 rm -f invoker.tar
 
 EXPOSE 8080
+
+# Download Kafka
+RUN wget --no-verbose -P /tmp http://www.us.apache.org/dist/kafka/${KAFKA_VERSION}/${KAFKA_RELEASE}
+
+# untar Kafka to /kafka
+RUN mkdir /kafka
+RUN tar -xf /tmp/${KAFKA_RELEASE} -C /kafka --strip=1
+
+# remove Kafka tarball
+RUN rm /tmp/${KAFKA_RELEASE}
+
+# Copy the init script
+COPY invoker.sh /invoker.sh
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index 14a9191057..c3fc4f13b7 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -3,6 +3,7 @@ apply plugin: 'application'
 apply plugin: 'eclipse'
 
 ext.dockerImageName = 'invoker'
+apply from: '../../gradle/settings/kafka.gradle'
 apply from: '../../gradle/docker.gradle'
 distDocker.dependsOn ':common:scala:distDocker', 'distTar'
 
diff --git a/core/invoker/invoker.sh b/core/invoker/invoker.sh
new file mode 100755
index 0000000000..dc23aa7a12
--- /dev/null
+++ b/core/invoker/invoker.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+set -x
+
+if [ -z "$1" ]; then
+  echo "Invoker index not passed as first argument to startup script"
+  exit 1
+fi
+
+INVOKER_INDEX=$1
+
+echo "Create invoker topics"
+OUTPUT=$(/kafka/bin/kafka-topics.sh --create --topic invoker$INVOKER_INDEX --replication-factor $REPLICATION_FACTOR --partitions $PARTITIONS --zookeeper ${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT} --config retention.bytes=$KAFKA_TOPICS_INVOKER_RETENTIONBYTES --config retention.ms=$KAFKA_TOPICS_INVOKER_RETENTIONMS --config segment.bytes=$KAFKA_TOPICS_INVOKER_SEGMENTBYTES)
+
+if ! ([[ "$OUTPUT" == *"already exists"* ]] || [[ "$OUTPUT" == *"Created topic"* ]]); then
+  echo "Failed to create invoker$i topic"
+  exit 1
+fi
+
+# Start command is defined in deployment option, Ansible, Kubernetes, etc
+exec /invoker/bin/invoker $INVOKER_INDEX
diff --git a/core/kafka/Dockerfile b/core/kafka/Dockerfile
new file mode 100644
index 0000000000..99c6aa7d26
--- /dev/null
+++ b/core/kafka/Dockerfile
@@ -0,0 +1,5 @@
+FROM ches/kafka:0.10.2.1
+
+COPY init.sh /init.sh
+
+CMD ["/init.sh"]
diff --git a/core/kafka/build.gradle b/core/kafka/build.gradle
new file mode 100644
index 0000000000..d25224a9d3
--- /dev/null
+++ b/core/kafka/build.gradle
@@ -0,0 +1,2 @@
+ext.dockerImageName = 'kafka'
+apply from: '../../gradle/docker.gradle'
diff --git a/core/kafka/init.sh b/core/kafka/init.sh
new file mode 100755
index 0000000000..f6043cd43e
--- /dev/null
+++ b/core/kafka/init.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+set -mx
+
+# start the kafka process
+/start.sh &
+
+TIMEOUT=0
+echo "wait for Kafka to be up and running"
+until [ $TIMEOUT -eq 25 ]; do
+  echo "waiting for kafka to be available"
+
+  nc -z 127.0.0.1 9092
+  if [ $? -eq 0 ]; then
+    echo "kafka is up and running"
+    break
+  fi
+
+  sleep 0.2
+  let TIMEOUT=TIMEOUT+1
+done
+
+if [ $TIMEOUT -eq 25 ]; then
+  echo "failed to setup and reach kafka"
+  exit 1
+fi
+
+echo "Creating health topic"
+OUTPUT=$(unset JMX_PORT; /kafka/bin/kafka-topics.sh --create --topic health --replication-factor $REPLICATION_FACTOR --partitions $PARTITIONS --zookeeper ${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT} --config retention.bytes=$KAFKA_TOPICS_HEALTH_RETENTIONBYTES --config retention.ms=$KAFKA_TOPICS_HEALTH_RETENTIONMS --config segment.bytes=$KAFKA_TOPICS_HEALTH_SEGMENTBYTES)
+if ! ([[ "$OUTPUT" == *"already exists"* ]] || [[ "$OUTPUT" == *"Created topic"* ]]); then
+  echo "Failed to create Health topic"
+  exit 1
+fi
+
+echo "Created Health topics and Kafka is running"
+
+# give controll back to the kafka process
+fg
diff --git a/gradle/settings/kafka.gradle b/gradle/settings/kafka.gradle
new file mode 100644
index 0000000000..36f54d8279
--- /dev/null
+++ b/gradle/settings/kafka.gradle
@@ -0,0 +1,5 @@
+// values for Kafka image that is used by the Controller and Invoker images
+ext.dockerBuildArgs = [
+  'KAFKA_VERSION=0.10.2.1',
+  'KAFKA_RELEASE=kafka_2.12-0.10.2.1.tgz'
+]
diff --git a/settings.gradle b/settings.gradle
index 375b6de15a..3dcfd090fe 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -2,6 +2,7 @@ include 'common:scala'
 
 include 'core:controller'
 include 'core:invoker'
+include 'core:kafka'
 include 'core:nodejsActionBase'
 include 'core:nodejs6Action'
 include 'core:actionProxy'
diff --git a/tools/build/redo b/tools/build/redo
index f47c831b93..939465149f 100755
--- a/tools/build/redo
+++ b/tools/build/redo
@@ -241,7 +241,8 @@ Components = [
 
     makeComponent('kafka',
                   'build/deploy kafka',
-                  modes = 'clean'),
+                  modes = 'clean',
+                  gradle = ':core:kafka'),
 
     makeComponent('controller',
                   'build/deploy controller',


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services