You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/20 07:36:30 UTC

[GitHub] markusthoemmes closed pull request #3258: Encrypt data in motion - Kafka

markusthoemmes closed pull request #3258: Encrypt data in motion - Kafka
URL: https://github.com/apache/incubator-openwhisk/pull/3258
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index bfb7f48f58..710e6f59dd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@
 nginx.conf
 whisk.properties
 default.props
+/tests/src/test/resources/application.conf
 
 .ant-targets-build.xml
 /results/
@@ -60,6 +61,8 @@ ansible/tmp/*
 ansible/roles/nginx/files/openwhisk-client*
 ansible/roles/nginx/files/*.csr
 ansible/roles/nginx/files/*cert.pem
+ansible/roles/nginx/files/*p12
+ansible/roles/kafka/files/*
 
 # .zip files must be explicited whitelisted
 *.zip
diff --git a/ansible/files/genssl.sh b/ansible/files/genssl.sh
new file mode 100755
index 0000000000..a506a7ed40
--- /dev/null
+++ b/ansible/files/genssl.sh
@@ -0,0 +1,119 @@
+#!/bin/bash
+
+set -e
+
+PASSWORD="openwhisk"
+
+if [ "$#" -lt 3 ]; then
+    echo "usage: $0 <common name: host or ip> [server|client] <scriptdir> <OPTIONAL:TrustorePassword> <OPTIONAL:generateKey>"
+    exit
+fi
+CN=$1
+TYPE=$2
+SCRIPTDIR=$3
+export TRUSTSTORE_PASSWORD=${4:-PASSWORD}
+GENKEY=$5
+NAME_PREFIX=$6
+
+
+
+## generates a (self-signed) certificate
+if [[ -n $GENKEY ]]
+then
+  openssl genrsa -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" 2048
+fi
+function gen_csr(){
+  echo generating server certificate request
+  openssl req -new \
+      -key "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+      -nodes \
+      -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
+      -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-request.csr"
+}
+function gen_cert(){
+  echo generating self-signed password-less server certificate
+  openssl x509 -req \
+      -in "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-request.csr" \
+      -signkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+      -out ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem \
+      -days 365
+}
+
+if [ "$TYPE" == "server_with_JKS_keystore" ]; then
+  gen_csr
+  gen_cert
+  echo generate new key and place it in the keystore
+  keytool -genkey -v \
+    -alias $CN \
+    -dname "C=US,ST=NY,L=Yorktown,O=OpenWhisk,CN=$CN" \
+    -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks \
+    -keypass:env TRUSTSTORE_PASSWORD \
+    -storepass:env TRUSTSTORE_PASSWORD \
+    -keyalg RSA \
+    -ext KeyUsage:critical="keyCertSign" \
+    -ext BasicConstraints:critical="ca:true" \
+    -validity 365
+  echo export private key from the keystore
+  keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias $CN -certreq -file ${SCRIPTDIR}/${NAME_PREFIX}cert-file -storepass:env TRUSTSTORE_PASSWORD
+  echo sign the certificate with private key
+  openssl x509 -req -CA ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem -CAkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" -in ${SCRIPTDIR}/${NAME_PREFIX}cert-file -out ${SCRIPTDIR}/${NAME_PREFIX}cert-signed -days 365 -CAcreateserial -passin pass:$TRUSTSTORE_PASSWORD
+  echo import CA cert in the keystore
+  keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias CARoot -import -file ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem -storepass:env TRUSTSTORE_PASSWORD -noprompt
+  echo import the private key in the keystore
+  keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias $CN -import -file ${SCRIPTDIR}/${NAME_PREFIX}cert-signed -storepass:env TRUSTSTORE_PASSWORD -noprompt
+
+elif [ "$TYPE" == "server" ]; then
+    gen_csr
+    gen_cert
+    echo generate keystore
+    openssl pkcs12 -export -name $CN \
+         -passout pass:$TRUSTSTORE_PASSWORD \
+         -in "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-cert.pem" \
+         -inkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+         -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-keystore.p12"
+else
+    echo generating client ca key
+    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-ca-key.pem" 2048
+
+    echo generating client ca request
+    openssl req -new \
+    -key "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+    -passin pass:$PASSWORD \
+    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
+    -out "$SCRIPTDIR/openwhisk-client-ca.csr"
+
+    echo generating client ca pem
+    openssl x509 -req \
+    -in "$SCRIPTDIR/openwhisk-client-ca.csr" \
+    -signkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+    -passin pass:$PASSWORD \
+    -out "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
+    -days 365 -sha1 -extensions v3_ca
+
+    echo generating client key
+    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-key.pem" 2048
+
+    echo generating client certificate csr file
+    openssl req -new \
+    -key "$SCRIPTDIR/openwhisk-client-key.pem" \
+    -passin pass:$PASSWORD \
+    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=guest" \
+    -out "$SCRIPTDIR/openwhisk-client-certificate-request.csr"
+
+    echo generating self-signed client certificate
+    echo 01 > $SCRIPTDIR/openwhisk-client-ca-cert.srl
+    openssl x509 -req \
+    -in "$SCRIPTDIR/openwhisk-client-certificate-request.csr" \
+    -CA "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
+    -CAkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+    -CAserial "$SCRIPTDIR/openwhisk-client-ca-cert.srl" \
+    -passin pass:$PASSWORD \
+    -out "$SCRIPTDIR/openwhisk-client-cert.pem" \
+    -days 365 -sha1 -extensions v3_req
+
+    echo remove client key\'s password
+    openssl rsa \
+    -in "$SCRIPTDIR/openwhisk-client-key.pem" \
+    -passin pass:$PASSWORD \
+    -out "$SCRIPTDIR/openwhisk-client-key.pem"
+fi
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 23eeeac2c6..83567337e7 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -75,8 +75,15 @@ registry:
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
+  ssl:
+    client_authentication: required
+    keystore:
+      name: kafka-keystore.jks
+      password: openwhisk
+  protocol: "{{ kafka_protocol_for_setup }}"
   version: 0.11.0.1
-  port: 9092
+  port: 9072
+  advertisedPort: 9093
   ras:
     port: 8093
   heap: "{{ kafka_heap | default('1g') }}"
@@ -84,10 +91,12 @@ kafka:
 
 kafka_connect_string: "{% set ret = [] %}\
                        {% for host in groups['kafkas'] %}\
-                         {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+                         {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.advertisedPort+loop.index-1)|string) ) }}\
                        {% endfor %}\
                        {{ ret | join(',') }}"
 
+kafka_protocol_for_setup: "{{ kafka_protocol | default('PLAINTEXT') }}"
+
 zookeeper:
   version: 3.4
   port: 2181
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index f07566400e..52dbc807e6 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -45,6 +45,12 @@
     dest: "{{ controller.confdir }}/{{ controller_name }}/jmxremote.access"
     mode: 0777
 
+- name: "copy kafka truststore/keystore"
+  when: kafka.protocol == 'SSL'
+  copy:
+    src: "{{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}"
+    dest: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}"
+
 - name: check, that required databases exist
   include: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
   vars:
@@ -106,7 +112,11 @@
       "CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ kafka_topics_health_retentionBytes | default() }}"
       "CONFIG_whisk_kafka_topics_health_retentionMs": "{{ kafka_topics_health_retentionMS | default() }}"
       "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}"
-
+      "CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
+      "CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
+      "CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"
+      "CONFIG_whisk_kafka_common_sslKeystoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
+      "CONFIG_whisk_kafka_common_sslKeystorePassword": "{{ kafka.ssl.keystore.password }}"
       "DB_PROTOCOL": "{{ db_protocol }}"
       "DB_PROVIDER": "{{ db_provider }}"
       "DB_HOST": "{{ db_host }}"
@@ -157,7 +167,6 @@
 
       "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
       "CONFIG_whisk_spi_LoadBalancerProvider": "{{ controller.loadbalancer.spi }}"
-      
       "CONFIG_logback_log_level": "{{ controller.loglevel }}"
 
       "CONFIG_whisk_transactions_stride": "{{ transactions.stride | default() }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index be2a251346..0f3bdc6972 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -60,6 +60,12 @@
     mode: 0777
   become: "{{ invoker.dir.become }}"
 
+- name: "copy kafka truststore/keystore"
+  when: kafka.protocol == 'SSL'
+  copy:
+    src: "{{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}"
+    dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}"
+
 - name: check, that required databases exist
   include: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
   vars:
@@ -149,6 +155,11 @@
         -e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ kafka_topics_invoker_retentionBytes | default() }}'
         -e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ kafka_topics_invoker_retentionMS | default() }}'
         -e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ kafka_topics_invoker_segmentBytes | default() }}'
+        -e CONFIG_whisk_kafka_common_securityProtocol='{{ kafka.protocol }}'
+        -e CONFIG_whisk_kafka_common_sslTruststoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
+        -e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ kafka.ssl.keystore.password }}'
+        -e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
+        -e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ kafka.ssl.keystore.password }}'
         -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index cf599195e5..f7c3270711 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -6,6 +6,67 @@
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
+- name: "create kafka certificate directory"
+  file:
+    path: "{{ config_root_dir }}/kafka/certs"
+    state: directory
+    mode: 0777
+
+- name: "copy keystore"
+  when: kafka.protocol == 'SSL'
+  copy:
+    src: "files/{{ kafka.ssl.keystore.name }}"
+    dest: "{{ config_root_dir }}/kafka/certs"
+
+- name: add kafka default env vars
+  set_fact:
+    kafka_env_vars:
+      "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+      "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+      "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
+
+- name: add kafka non-ssl vars
+  when: kafka.protocol != 'SSL'
+  set_fact:
+    kafka_non_ssl_vars:
+      "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+- name: add kafka ssl env vars
+  when: kafka.protocol == 'SSL'
+  set_fact:
+    kafka_ssl_env_vars:
+      "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
+      "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_PROTOCOL_NAME": "INTERNAL"
+      "KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
+      "KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+      "KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+      "KAFKA_SSL_TRUSTSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
+      "KAFKA_SSL_TRUSTSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+      "KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
+    # The sed script passed in CUSTOM_INIT_SCRIPT fixes a bug in the wurstmeister dcoker image
+    # by patching the server.configuration file right before kafka is started.
+    # The script adds the missing advertized hostname to the advertised.listener property
+    # Issue: https://github.com/wurstmeister/kafka-docker/issues/221
+      "CUSTOM_INIT_SCRIPT": sed -i \'/^advertised\\.listeners/ s/\\/\\/\\:/\\/\\/{{ ansible_host }}\\:/\' /opt/kafka/config/server.properties
+
+- name: "join kafka ssl env vars"
+  when: kafka.protocol == 'SSL'
+  set_fact:
+    kafka_env_vars: "{{ kafka_env_vars | combine(kafka_ssl_env_vars) }}"
+
+- name: join kafka non-ssl env vars
+  when: kafka.protocol != 'SSL'
+  set_fact:
+    kafka_env_vars: "{{ kafka_env_vars | combine(kafka_non_ssl_vars) }}"
+
 - name: (re)start kafka
   vars:
     zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
@@ -15,17 +76,12 @@
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
-    env:
-      "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
-      "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
-      "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
-      "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
-      "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
-      "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
+    env: "{{ kafka_env_vars }}"
     ports:
-      - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
+      - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+      - "{{ kafka.advertisedPort  + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+    volumes:
+      - "{{ config_root_dir }}/kafka/certs:/config"
 
 - name: wait until the kafka server started up
   shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
diff --git a/ansible/roles/nginx/files/genssl.sh b/ansible/roles/nginx/files/genssl.sh
deleted file mode 100755
index e61e9b23e1..0000000000
--- a/ansible/roles/nginx/files/genssl.sh
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/bin/bash
-
-set -e
-
-SCRIPTDIR="$(cd $(dirname "$0")/ && pwd)"
-
-if [ "$#" -lt 2 ]; then
-    echo "usage: $0 <common name: host or ip> [server|client]"
-    exit
-fi
-CN=$1
-TYPE=$2
-PASSWORD="openwhisk"
-
-## generates a (self-signed) certificate
-
-## uncomment to regenerate the key
-#openssl genrsa -out "$SCRIPTDIR/openwhisk-server-key.pem" 2048
-
-if [ "$TYPE" == "server" ]; then
-    echo generating server certificate request
-    openssl req -new \
-        -key "$SCRIPTDIR/openwhisk-server-key.pem" \
-        -nodes \
-        -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
-        -out "$SCRIPTDIR/openwhisk-server-request.csr"
-
-    echo generating self-signed password-less server certificate
-    openssl x509 -req \
-        -in "$SCRIPTDIR/openwhisk-server-request.csr" \
-        -signkey "$SCRIPTDIR/openwhisk-server-key.pem" \
-        -out "$SCRIPTDIR/openwhisk-server-cert.pem" \
-        -days 365
-else
-    echo generating client ca key
-    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-ca-key.pem" 2048
-
-    echo generating client ca request
-    openssl req -new \
-    -key "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
-    -passin pass:$PASSWORD \
-    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
-    -out "$SCRIPTDIR/openwhisk-client-ca.csr"
-
-    echo generating client ca pem
-    openssl x509 -req \
-    -in "$SCRIPTDIR/openwhisk-client-ca.csr" \
-    -signkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
-    -passin pass:$PASSWORD \
-    -out "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
-    -days 365 -sha1 -extensions v3_ca
-
-    echo generating client key
-    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-key.pem" 2048
-
-    echo generating client certificate csr file
-    openssl req -new \
-    -key "$SCRIPTDIR/openwhisk-client-key.pem" \
-    -passin pass:$PASSWORD \
-    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=guest" \
-    -out "$SCRIPTDIR/openwhisk-client-certificate-request.csr"
-
-    echo generating self-signed client certificate
-    echo 01 > $SCRIPTDIR/openwhisk-client-ca-cert.srl
-    openssl x509 -req \
-    -in "$SCRIPTDIR/openwhisk-client-certificate-request.csr" \
-    -CA "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
-    -CAkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
-    -CAserial "$SCRIPTDIR/openwhisk-client-ca-cert.srl" \
-    -passin pass:$PASSWORD \
-    -out "$SCRIPTDIR/openwhisk-client-cert.pem" \
-    -days 365 -sha1 -extensions v3_req
-
-    echo remove client key\'s password
-    openssl rsa \
-    -in "$SCRIPTDIR/openwhisk-client-key.pem" \
-    -passin pass:$PASSWORD \
-    -out "$SCRIPTDIR/openwhisk-client-key.pem"
-fi
diff --git a/ansible/setup.yml b/ansible/setup.yml
index a5c28f9214..6a20d99216 100644
--- a/ansible/setup.yml
+++ b/ansible/setup.yml
@@ -17,14 +17,34 @@
       docker_machine_ip: "{{ result.stdout }}"
     when: "'environments/docker-machine' in hosts_dir"
 
-  - name: gen hosts for docker-machine 
+  - name: gen hosts for docker-machine
     local_action: template src="{{playbook_dir}}/environments/docker-machine/hosts.j2.ini" dest="{{ playbook_dir }}/environments/docker-machine/hosts"
     when: "'environments/docker-machine' in hosts_dir"
 
   - name: gen untrusted server certificate for host
-    local_action: shell "{{ playbook_dir }}/roles/nginx/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "server"
+    local_action: shell "{{ playbook_dir }}/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "server" "{{ playbook_dir }}/roles/nginx/files"
     when: nginx.ssl.cert == "openwhisk-server-cert.pem"
-    
+
   - name: gen untrusted client certificate for host
-    local_action: shell "{{ playbook_dir }}/roles/nginx/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "client"
+    local_action: shell "{{ playbook_dir }}/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "client" "{{ playbook_dir }}/roles/nginx/files"
     when: nginx.ssl.client_ca_cert == "openwhisk-client-ca-cert.pem"
+
+  - name: clean up old kafka keystore
+    file:
+      path: "{{ playbook_dir }}/roles/kafka/files"
+      state: absent
+    become: "{{ logs.dir.become }}"
+    when: kafka_protocol_for_setup == 'SSL'
+
+  - name: ensure kafka files directory exists
+    file:
+      path: "{{ playbook_dir }}/roles/kafka/files/"
+      state: directory
+      mode: 0777
+    become: "{{ logs.dir.become }}"
+    when: kafka_protocol_for_setup == 'SSL'
+
+
+  - name: generate kafka certificates
+    local_action: shell "{{ playbook_dir }}/files/genssl.sh" "openwhisk-kafka" "server_with_JKS_keystore" "{{ playbook_dir }}/roles/kafka/files" openwhisk "generateKey" "kafka-"
+    when: kafka_protocol_for_setup == 'SSL'
diff --git a/ansible/tasks/writeWhiskProperties.yml b/ansible/tasks/writeWhiskProperties.yml
index 4f5fc14e20..e94de221cf 100644
--- a/ansible/tasks/writeWhiskProperties.yml
+++ b/ansible/tasks/writeWhiskProperties.yml
@@ -1,8 +1,13 @@
 ---
-# This task will write whisk.properties to the openwhisk_home. 
+# This task will write whisk.properties to the openwhisk_home.
 # Currently whisk.properties is still needed for tests.
 
 - name: write whisk.properties template to openwhisk_home
   template:
     src: whisk.properties.j2
     dest: "{{ openwhisk_home }}/whisk.properties"
+
+- name: write test's application conf overrides
+  template:
+    src: "{{ openwhisk_home }}/tests/src/test/resources/application.conf.j2"
+    dest: "{{ openwhisk_home }}/tests/src/test/resources/application.conf"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 84682862d7..3e09bba4d8 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -54,10 +54,25 @@ whisk {
     kafka {
         replication-factor = 1
 
+        common {
+            security-protocol = PLAINTEXT
+        }
         producer {
             acks = 1
             max-request-size = ${whisk.activation.payload.max}
         }
+        consumer {
+            session-timeout-ms = 30000
+            heartbeat-interval-ms = 10000
+            enable-auto-commit = true
+            auto-commit-interval-ms = 10000
+            auto-offset-reset = earliest
+            max-poll-interval = 360000
+            // This value controls the server-side wait time which affects polling latency.
+            // A low value improves latency performance but it is important to not set it too low
+            // as that will cause excessive busy-waiting.
+            fetch-max-wait-ms = 20
+        }
 
         topics {
             cache-invalidation {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index a1116d8e69..09cd3ac648 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -23,23 +23,18 @@ import scala.collection.JavaConversions.iterableAsScalaIterable
 import scala.collection.JavaConversions.seqAsJavaList
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-
+import pureconfig.loadConfigOrThrow
 import whisk.common.Logging
+import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
 class KafkaConsumerConnector(kafkahost: String,
                              groupid: String,
                              topic: String,
-                             override val maxPeek: Int = Int.MaxValue,
-                             readeos: Boolean = true,
-                             sessionTimeout: FiniteDuration = 30.seconds,
-                             autoCommitInterval: FiniteDuration = 10.seconds,
-                             maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging)
+                             override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging)
     extends MessageConsumer {
 
   /**
@@ -68,19 +63,14 @@ class KafkaConsumerConnector(kafkahost: String,
     val props = new Properties
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
-    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)
-    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, (sessionTimeout.toMillis / 3).toString)
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true.toString)
-    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval.toMillis.toString)
     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (!readeos) "latest" else "earliest")
-    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)
-
-    // This value controls the server-side wait time which affects polling latency.
-    // A low value improves latency performance but it is important to not set it too low
-    // as that will cause excessive busy-waiting.
-    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "20")
 
+    val config =
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
+    config.foreach {
+      case (key, value) => props.put(key, value)
+    }
     props
   }
 
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index ac148cd554..3c8df850e1 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -23,19 +23,16 @@ import java.util.concurrent.ExecutionException
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.FiniteDuration
 import scala.collection.JavaConverters._
-
 import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.admin.AdminClient
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.errors.TopicExistsException
-
 import whisk.common.Logging
 import whisk.core.ConfigKeys
 import whisk.core.WhiskConfig
 import whisk.core.connector.MessageConsumer
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
-
 import pureconfig._
 
 case class KafkaConfig(replicationFactor: Short)
@@ -44,9 +41,10 @@ case class KafkaConfig(replicationFactor: Short)
  * A Kafka based implementation of MessagingProvider
  */
 object KafkaMessagingProvider extends MessagingProvider {
+
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging): MessageConsumer =
-    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
 
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
     new KafkaProducerConnector(config.kafkaHosts, ec)
@@ -57,6 +55,14 @@ object KafkaMessagingProvider extends MessagingProvider {
       loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
+    val commonConfig =
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
+    commonConfig.foreach {
+      case (key, value) => {
+        props.put(key, value)
+      }
+    }
+
     val client = AdminClient.create(props)
     val numPartitions = 1
     val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 7b0977b825..dca7ccec39 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -89,8 +89,12 @@ class KafkaProducerConnector(kafkahosts: String,
 
     // Load additional config from the config files and add them here.
     val config =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
-    config.foreach { case (key, value) => props.put(key, value) }
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
+
+    config.foreach {
+      case (key, value) => props.put(key, value)
+    }
     props
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index cf66627ed0..79c6d89003 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -236,7 +236,9 @@ object ConfigKeys {
   val loadbalancer = "whisk.loadbalancer"
 
   val kafka = "whisk.kafka"
+  val kafkaCommon = s"$kafka.common"
   val kafkaProducer = s"$kafka.producer"
+  val kafkaConsumer = s"$kafka.consumer"
   val kafkaTopics = s"$kafka.topics"
 
   val memory = "whisk.memory"
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf.j2
similarity index 55%
rename from tests/src/test/resources/application.conf
rename to tests/src/test/resources/application.conf.j2
index 53f420d0bf..94f16642a4 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf.j2
@@ -20,5 +20,15 @@ whisk {
                 retention-ms    = 3600000
             }
         }
+        common {
+          security-protocol: {{ kafka.protocol }}
+          ssl-truststore-location: {{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
+          ssl-truststore-password: {{ kafka.ssl.keystore.password }}
+          ssl-keystore-location: {{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
+          ssl-keystore-password: {{ kafka.ssl.keystore.password }}
+        }
+        consumer {
+          max-poll-interval-ms: 10000
+        }
     }
 }
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 7761856285..d987cb79c2 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -46,6 +46,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
 
   val groupid = "kafkatest"
   val topic = "KafkaConnectorTestTopic"
+  val maxPollInterval = 10 seconds
 
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than
@@ -56,15 +57,11 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
   assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
-  val sessionTimeout: FiniteDuration = 10 seconds
-  val maxPollInterval: FiniteDuration = 10 seconds
+  println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+
   val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
-  val consumer = new KafkaConsumerConnector(
-    config.kafkaHosts,
-    groupid,
-    topic,
-    sessionTimeout = sessionTimeout,
-    maxPollInterval = maxPollInterval)
+  val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
 
   override def afterAll(): Unit = {
     producer.close()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services