You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/20 07:36:28 UTC

[incubator-openwhisk] branch master updated: Secure connections/data-in-motion to Kafka using SSL. (#3258)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d53cc32  Secure connections/data-in-motion to Kafka using SSL. (#3258)
d53cc32 is described below

commit d53cc320fea6ea11b1bfdd90da9ac87c39e89e6c
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Tue Feb 20 08:36:25 2018 +0100

    Secure connections/data-in-motion to Kafka using SSL. (#3258)
---
 .gitignore                                         |   3 +
 ansible/files/genssl.sh                            | 119 +++++++++++++++++++++
 ansible/group_vars/all                             |  13 ++-
 ansible/roles/controller/tasks/deploy.yml          |  13 ++-
 ansible/roles/invoker/tasks/deploy.yml             |  11 ++
 ansible/roles/kafka/tasks/deploy.yml               |  76 +++++++++++--
 ansible/roles/nginx/files/genssl.sh                |  79 --------------
 ansible/setup.yml                                  |  28 ++++-
 ansible/tasks/writeWhiskProperties.yml             |   7 +-
 common/scala/src/main/resources/application.conf   |  15 +++
 .../connector/kafka/KafkaConsumerConnector.scala   |  28 ++---
 .../connector/kafka/KafkaMessagingProvider.scala   |  14 ++-
 .../connector/kafka/KafkaProducerConnector.scala   |   8 +-
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   2 +
 .../{application.conf => application.conf.j2}      |  10 ++
 .../test/scala/services/KafkaConnectorTests.scala  |  13 +--
 16 files changed, 308 insertions(+), 131 deletions(-)

diff --git a/.gitignore b/.gitignore
index bfb7f48..710e6f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@
 nginx.conf
 whisk.properties
 default.props
+/tests/src/test/resources/application.conf
 
 .ant-targets-build.xml
 /results/
@@ -60,6 +61,8 @@ ansible/tmp/*
 ansible/roles/nginx/files/openwhisk-client*
 ansible/roles/nginx/files/*.csr
 ansible/roles/nginx/files/*cert.pem
+ansible/roles/nginx/files/*p12
+ansible/roles/kafka/files/*
 
 # .zip files must be explicited whitelisted
 *.zip
diff --git a/ansible/files/genssl.sh b/ansible/files/genssl.sh
new file mode 100755
index 0000000..a506a7e
--- /dev/null
+++ b/ansible/files/genssl.sh
@@ -0,0 +1,119 @@
+#!/bin/bash
+
+set -e
+
+PASSWORD="openwhisk"
+
+if [ "$#" -lt 3 ]; then
+    echo "usage: $0 <common name: host or ip> [server|client] <scriptdir> <OPTIONAL:TrustorePassword> <OPTIONAL:generateKey>"
+    exit
+fi
+CN=$1
+TYPE=$2
+SCRIPTDIR=$3
+export TRUSTSTORE_PASSWORD=${4:-PASSWORD}
+GENKEY=$5
+NAME_PREFIX=$6
+
+
+
+## generates a (self-signed) certificate
+if [[ -n $GENKEY ]]
+then
+  openssl genrsa -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" 2048
+fi
+function gen_csr(){
+  echo generating server certificate request
+  openssl req -new \
+      -key "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+      -nodes \
+      -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
+      -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-request.csr"
+}
+function gen_cert(){
+  echo generating self-signed password-less server certificate
+  openssl x509 -req \
+      -in "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-request.csr" \
+      -signkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+      -out ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem \
+      -days 365
+}
+
+if [ "$TYPE" == "server_with_JKS_keystore" ]; then
+  gen_csr
+  gen_cert
+  echo generate new key and place it in the keystore
+  keytool -genkey -v \
+    -alias $CN \
+    -dname "C=US,ST=NY,L=Yorktown,O=OpenWhisk,CN=$CN" \
+    -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks \
+    -keypass:env TRUSTSTORE_PASSWORD \
+    -storepass:env TRUSTSTORE_PASSWORD \
+    -keyalg RSA \
+    -ext KeyUsage:critical="keyCertSign" \
+    -ext BasicConstraints:critical="ca:true" \
+    -validity 365
+  echo export private key from the keystore
+  keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias $CN -certreq -file ${SCRIPTDIR}/${NAME_PREFIX}cert-file -storepass:env TRUSTSTORE_PASSWORD
+  echo sign the certificate with private key
+  openssl x509 -req -CA ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem -CAkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" -in ${SCRIPTDIR}/${NAME_PREFIX}cert-file -out ${SCRIPTDIR}/${NAME_PREFIX}cert-signed -days 365 -CAcreateserial -passin pass:$TRUSTSTORE_PASSWORD
+  echo import CA cert in the keystore
+  keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias CARoot -import -file ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem -storepass:env TRUSTSTORE_PASSWORD -noprompt
+  echo import the private key in the keystore
+  keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias $CN -import -file ${SCRIPTDIR}/${NAME_PREFIX}cert-signed -storepass:env TRUSTSTORE_PASSWORD -noprompt
+
+elif [ "$TYPE" == "server" ]; then
+    gen_csr
+    gen_cert
+    echo generate keystore
+    openssl pkcs12 -export -name $CN \
+         -passout pass:$TRUSTSTORE_PASSWORD \
+         -in "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-cert.pem" \
+         -inkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+         -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-keystore.p12"
+else
+    echo generating client ca key
+    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-ca-key.pem" 2048
+
+    echo generating client ca request
+    openssl req -new \
+    -key "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+    -passin pass:$PASSWORD \
+    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
+    -out "$SCRIPTDIR/openwhisk-client-ca.csr"
+
+    echo generating client ca pem
+    openssl x509 -req \
+    -in "$SCRIPTDIR/openwhisk-client-ca.csr" \
+    -signkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+    -passin pass:$PASSWORD \
+    -out "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
+    -days 365 -sha1 -extensions v3_ca
+
+    echo generating client key
+    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-key.pem" 2048
+
+    echo generating client certificate csr file
+    openssl req -new \
+    -key "$SCRIPTDIR/openwhisk-client-key.pem" \
+    -passin pass:$PASSWORD \
+    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=guest" \
+    -out "$SCRIPTDIR/openwhisk-client-certificate-request.csr"
+
+    echo generating self-signed client certificate
+    echo 01 > $SCRIPTDIR/openwhisk-client-ca-cert.srl
+    openssl x509 -req \
+    -in "$SCRIPTDIR/openwhisk-client-certificate-request.csr" \
+    -CA "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
+    -CAkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+    -CAserial "$SCRIPTDIR/openwhisk-client-ca-cert.srl" \
+    -passin pass:$PASSWORD \
+    -out "$SCRIPTDIR/openwhisk-client-cert.pem" \
+    -days 365 -sha1 -extensions v3_req
+
+    echo remove client key\'s password
+    openssl rsa \
+    -in "$SCRIPTDIR/openwhisk-client-key.pem" \
+    -passin pass:$PASSWORD \
+    -out "$SCRIPTDIR/openwhisk-client-key.pem"
+fi
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 23eeeac..8356733 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -75,8 +75,15 @@ registry:
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
+  ssl:
+    client_authentication: required
+    keystore:
+      name: kafka-keystore.jks
+      password: openwhisk
+  protocol: "{{ kafka_protocol_for_setup }}"
   version: 0.11.0.1
-  port: 9092
+  port: 9072
+  advertisedPort: 9093
   ras:
     port: 8093
   heap: "{{ kafka_heap | default('1g') }}"
@@ -84,10 +91,12 @@ kafka:
 
 kafka_connect_string: "{% set ret = [] %}\
                        {% for host in groups['kafkas'] %}\
-                         {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+                         {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.advertisedPort+loop.index-1)|string) ) }}\
                        {% endfor %}\
                        {{ ret | join(',') }}"
 
+kafka_protocol_for_setup: "{{ kafka_protocol | default('PLAINTEXT') }}"
+
 zookeeper:
   version: 3.4
   port: 2181
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index f075664..52dbc80 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -45,6 +45,12 @@
     dest: "{{ controller.confdir }}/{{ controller_name }}/jmxremote.access"
     mode: 0777
 
+- name: "copy kafka truststore/keystore"
+  when: kafka.protocol == 'SSL'
+  copy:
+    src: "{{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}"
+    dest: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}"
+
 - name: check, that required databases exist
   include: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
   vars:
@@ -106,7 +112,11 @@
       "CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ kafka_topics_health_retentionBytes | default() }}"
       "CONFIG_whisk_kafka_topics_health_retentionMs": "{{ kafka_topics_health_retentionMS | default() }}"
       "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}"
-
+      "CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
+      "CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
+      "CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"
+      "CONFIG_whisk_kafka_common_sslKeystoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
+      "CONFIG_whisk_kafka_common_sslKeystorePassword": "{{ kafka.ssl.keystore.password }}"
       "DB_PROTOCOL": "{{ db_protocol }}"
       "DB_PROVIDER": "{{ db_provider }}"
       "DB_HOST": "{{ db_host }}"
@@ -157,7 +167,6 @@
 
       "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
       "CONFIG_whisk_spi_LoadBalancerProvider": "{{ controller.loadbalancer.spi }}"
-      
       "CONFIG_logback_log_level": "{{ controller.loglevel }}"
 
       "CONFIG_whisk_transactions_stride": "{{ transactions.stride | default() }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index be2a251..0f3bdc6 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -60,6 +60,12 @@
     mode: 0777
   become: "{{ invoker.dir.become }}"
 
+- name: "copy kafka truststore/keystore"
+  when: kafka.protocol == 'SSL'
+  copy:
+    src: "{{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}"
+    dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}"
+
 - name: check, that required databases exist
   include: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
   vars:
@@ -149,6 +155,11 @@
         -e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ kafka_topics_invoker_retentionBytes | default() }}'
         -e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ kafka_topics_invoker_retentionMS | default() }}'
         -e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ kafka_topics_invoker_segmentBytes | default() }}'
+        -e CONFIG_whisk_kafka_common_securityProtocol='{{ kafka.protocol }}'
+        -e CONFIG_whisk_kafka_common_sslTruststoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
+        -e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ kafka.ssl.keystore.password }}'
+        -e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
+        -e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ kafka.ssl.keystore.password }}'
         -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index cf59919..f7c3270 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -6,6 +6,67 @@
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
+- name: "create kafka certificate directory"
+  file:
+    path: "{{ config_root_dir }}/kafka/certs"
+    state: directory
+    mode: 0777
+
+- name: "copy keystore"
+  when: kafka.protocol == 'SSL'
+  copy:
+    src: "files/{{ kafka.ssl.keystore.name }}"
+    dest: "{{ config_root_dir }}/kafka/certs"
+
+- name: add kafka default env vars
+  set_fact:
+    kafka_env_vars:
+      "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+      "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+      "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
+
+- name: add kafka non-ssl vars
+  when: kafka.protocol != 'SSL'
+  set_fact:
+    kafka_non_ssl_vars:
+      "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+- name: add kafka ssl env vars
+  when: kafka.protocol == 'SSL'
+  set_fact:
+    kafka_ssl_env_vars:
+      "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
+      "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_PROTOCOL_NAME": "INTERNAL"
+      "KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
+      "KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+      "KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+      "KAFKA_SSL_TRUSTSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
+      "KAFKA_SSL_TRUSTSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+      "KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
+    # The sed script passed in CUSTOM_INIT_SCRIPT fixes a bug in the wurstmeister dcoker image
+    # by patching the server.configuration file right before kafka is started.
+    # The script adds the missing advertized hostname to the advertised.listener property
+    # Issue: https://github.com/wurstmeister/kafka-docker/issues/221
+      "CUSTOM_INIT_SCRIPT": sed -i \'/^advertised\\.listeners/ s/\\/\\/\\:/\\/\\/{{ ansible_host }}\\:/\' /opt/kafka/config/server.properties
+
+- name: "join kafka ssl env vars"
+  when: kafka.protocol == 'SSL'
+  set_fact:
+    kafka_env_vars: "{{ kafka_env_vars | combine(kafka_ssl_env_vars) }}"
+
+- name: join kafka non-ssl env vars
+  when: kafka.protocol != 'SSL'
+  set_fact:
+    kafka_env_vars: "{{ kafka_env_vars | combine(kafka_non_ssl_vars) }}"
+
 - name: (re)start kafka
   vars:
     zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
@@ -15,17 +76,12 @@
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
-    env:
-      "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
-      "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
-      "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
-      "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
-      "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
-      "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
+    env: "{{ kafka_env_vars }}"
     ports:
-      - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
+      - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+      - "{{ kafka.advertisedPort  + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+    volumes:
+      - "{{ config_root_dir }}/kafka/certs:/config"
 
 - name: wait until the kafka server started up
   shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
diff --git a/ansible/roles/nginx/files/genssl.sh b/ansible/roles/nginx/files/genssl.sh
deleted file mode 100755
index e61e9b2..0000000
--- a/ansible/roles/nginx/files/genssl.sh
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/bin/bash
-
-set -e
-
-SCRIPTDIR="$(cd $(dirname "$0")/ && pwd)"
-
-if [ "$#" -lt 2 ]; then
-    echo "usage: $0 <common name: host or ip> [server|client]"
-    exit
-fi
-CN=$1
-TYPE=$2
-PASSWORD="openwhisk"
-
-## generates a (self-signed) certificate
-
-## uncomment to regenerate the key
-#openssl genrsa -out "$SCRIPTDIR/openwhisk-server-key.pem" 2048
-
-if [ "$TYPE" == "server" ]; then
-    echo generating server certificate request
-    openssl req -new \
-        -key "$SCRIPTDIR/openwhisk-server-key.pem" \
-        -nodes \
-        -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
-        -out "$SCRIPTDIR/openwhisk-server-request.csr"
-
-    echo generating self-signed password-less server certificate
-    openssl x509 -req \
-        -in "$SCRIPTDIR/openwhisk-server-request.csr" \
-        -signkey "$SCRIPTDIR/openwhisk-server-key.pem" \
-        -out "$SCRIPTDIR/openwhisk-server-cert.pem" \
-        -days 365
-else
-    echo generating client ca key
-    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-ca-key.pem" 2048
-
-    echo generating client ca request
-    openssl req -new \
-    -key "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
-    -passin pass:$PASSWORD \
-    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
-    -out "$SCRIPTDIR/openwhisk-client-ca.csr"
-
-    echo generating client ca pem
-    openssl x509 -req \
-    -in "$SCRIPTDIR/openwhisk-client-ca.csr" \
-    -signkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
-    -passin pass:$PASSWORD \
-    -out "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
-    -days 365 -sha1 -extensions v3_ca
-
-    echo generating client key
-    openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-key.pem" 2048
-
-    echo generating client certificate csr file
-    openssl req -new \
-    -key "$SCRIPTDIR/openwhisk-client-key.pem" \
-    -passin pass:$PASSWORD \
-    -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=guest" \
-    -out "$SCRIPTDIR/openwhisk-client-certificate-request.csr"
-
-    echo generating self-signed client certificate
-    echo 01 > $SCRIPTDIR/openwhisk-client-ca-cert.srl
-    openssl x509 -req \
-    -in "$SCRIPTDIR/openwhisk-client-certificate-request.csr" \
-    -CA "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
-    -CAkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
-    -CAserial "$SCRIPTDIR/openwhisk-client-ca-cert.srl" \
-    -passin pass:$PASSWORD \
-    -out "$SCRIPTDIR/openwhisk-client-cert.pem" \
-    -days 365 -sha1 -extensions v3_req
-
-    echo remove client key\'s password
-    openssl rsa \
-    -in "$SCRIPTDIR/openwhisk-client-key.pem" \
-    -passin pass:$PASSWORD \
-    -out "$SCRIPTDIR/openwhisk-client-key.pem"
-fi
diff --git a/ansible/setup.yml b/ansible/setup.yml
index a5c28f9..6a20d99 100644
--- a/ansible/setup.yml
+++ b/ansible/setup.yml
@@ -17,14 +17,34 @@
       docker_machine_ip: "{{ result.stdout }}"
     when: "'environments/docker-machine' in hosts_dir"
 
-  - name: gen hosts for docker-machine 
+  - name: gen hosts for docker-machine
     local_action: template src="{{playbook_dir}}/environments/docker-machine/hosts.j2.ini" dest="{{ playbook_dir }}/environments/docker-machine/hosts"
     when: "'environments/docker-machine' in hosts_dir"
 
   - name: gen untrusted server certificate for host
-    local_action: shell "{{ playbook_dir }}/roles/nginx/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "server"
+    local_action: shell "{{ playbook_dir }}/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "server" "{{ playbook_dir }}/roles/nginx/files"
     when: nginx.ssl.cert == "openwhisk-server-cert.pem"
-    
+
   - name: gen untrusted client certificate for host
-    local_action: shell "{{ playbook_dir }}/roles/nginx/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "client"
+    local_action: shell "{{ playbook_dir }}/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "client" "{{ playbook_dir }}/roles/nginx/files"
     when: nginx.ssl.client_ca_cert == "openwhisk-client-ca-cert.pem"
+
+  - name: clean up old kafka keystore
+    file:
+      path: "{{ playbook_dir }}/roles/kafka/files"
+      state: absent
+    become: "{{ logs.dir.become }}"
+    when: kafka_protocol_for_setup == 'SSL'
+
+  - name: ensure kafka files directory exists
+    file:
+      path: "{{ playbook_dir }}/roles/kafka/files/"
+      state: directory
+      mode: 0777
+    become: "{{ logs.dir.become }}"
+    when: kafka_protocol_for_setup == 'SSL'
+
+
+  - name: generate kafka certificates
+    local_action: shell "{{ playbook_dir }}/files/genssl.sh" "openwhisk-kafka" "server_with_JKS_keystore" "{{ playbook_dir }}/roles/kafka/files" openwhisk "generateKey" "kafka-"
+    when: kafka_protocol_for_setup == 'SSL'
diff --git a/ansible/tasks/writeWhiskProperties.yml b/ansible/tasks/writeWhiskProperties.yml
index 4f5fc14..e94de22 100644
--- a/ansible/tasks/writeWhiskProperties.yml
+++ b/ansible/tasks/writeWhiskProperties.yml
@@ -1,8 +1,13 @@
 ---
-# This task will write whisk.properties to the openwhisk_home. 
+# This task will write whisk.properties to the openwhisk_home.
 # Currently whisk.properties is still needed for tests.
 
 - name: write whisk.properties template to openwhisk_home
   template:
     src: whisk.properties.j2
     dest: "{{ openwhisk_home }}/whisk.properties"
+
+- name: write test's application conf overrides
+  template:
+    src: "{{ openwhisk_home }}/tests/src/test/resources/application.conf.j2"
+    dest: "{{ openwhisk_home }}/tests/src/test/resources/application.conf"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 8468286..3e09bba 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -54,10 +54,25 @@ whisk {
     kafka {
         replication-factor = 1
 
+        common {
+            security-protocol = PLAINTEXT
+        }
         producer {
             acks = 1
             max-request-size = ${whisk.activation.payload.max}
         }
+        consumer {
+            session-timeout-ms = 30000
+            heartbeat-interval-ms = 10000
+            enable-auto-commit = true
+            auto-commit-interval-ms = 10000
+            auto-offset-reset = earliest
+            max-poll-interval = 360000
+            // This value controls the server-side wait time which affects polling latency.
+            // A low value improves latency performance but it is important to not set it too low
+            // as that will cause excessive busy-waiting.
+            fetch-max-wait-ms = 20
+        }
 
         topics {
             cache-invalidation {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index a1116d8..09cd3ac 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -23,23 +23,18 @@ import scala.collection.JavaConversions.iterableAsScalaIterable
 import scala.collection.JavaConversions.seqAsJavaList
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-
+import pureconfig.loadConfigOrThrow
 import whisk.common.Logging
+import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
 class KafkaConsumerConnector(kafkahost: String,
                              groupid: String,
                              topic: String,
-                             override val maxPeek: Int = Int.MaxValue,
-                             readeos: Boolean = true,
-                             sessionTimeout: FiniteDuration = 30.seconds,
-                             autoCommitInterval: FiniteDuration = 10.seconds,
-                             maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging)
+                             override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging)
     extends MessageConsumer {
 
   /**
@@ -68,19 +63,14 @@ class KafkaConsumerConnector(kafkahost: String,
     val props = new Properties
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
-    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)
-    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, (sessionTimeout.toMillis / 3).toString)
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true.toString)
-    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval.toMillis.toString)
     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (!readeos) "latest" else "earliest")
-    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)
-
-    // This value controls the server-side wait time which affects polling latency.
-    // A low value improves latency performance but it is important to not set it too low
-    // as that will cause excessive busy-waiting.
-    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "20")
 
+    val config =
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
+    config.foreach {
+      case (key, value) => props.put(key, value)
+    }
     props
   }
 
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index ac148cd..3c8df85 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -23,19 +23,16 @@ import java.util.concurrent.ExecutionException
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.FiniteDuration
 import scala.collection.JavaConverters._
-
 import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.admin.AdminClient
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.errors.TopicExistsException
-
 import whisk.common.Logging
 import whisk.core.ConfigKeys
 import whisk.core.WhiskConfig
 import whisk.core.connector.MessageConsumer
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
-
 import pureconfig._
 
 case class KafkaConfig(replicationFactor: Short)
@@ -44,9 +41,10 @@ case class KafkaConfig(replicationFactor: Short)
  * A Kafka based implementation of MessagingProvider
  */
 object KafkaMessagingProvider extends MessagingProvider {
+
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging): MessageConsumer =
-    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
 
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
     new KafkaProducerConnector(config.kafkaHosts, ec)
@@ -57,6 +55,14 @@ object KafkaMessagingProvider extends MessagingProvider {
       loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
+    val commonConfig =
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
+    commonConfig.foreach {
+      case (key, value) => {
+        props.put(key, value)
+      }
+    }
+
     val client = AdminClient.create(props)
     val numPartitions = 1
     val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 7b0977b..dca7cce 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -89,8 +89,12 @@ class KafkaProducerConnector(kafkahosts: String,
 
     // Load additional config from the config files and add them here.
     val config =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
-    config.foreach { case (key, value) => props.put(key, value) }
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
+
+    config.foreach {
+      case (key, value) => props.put(key, value)
+    }
     props
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index cf66627..79c6d89 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -236,7 +236,9 @@ object ConfigKeys {
   val loadbalancer = "whisk.loadbalancer"
 
   val kafka = "whisk.kafka"
+  val kafkaCommon = s"$kafka.common"
   val kafkaProducer = s"$kafka.producer"
+  val kafkaConsumer = s"$kafka.consumer"
   val kafkaTopics = s"$kafka.topics"
 
   val memory = "whisk.memory"
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf.j2
similarity index 55%
rename from tests/src/test/resources/application.conf
rename to tests/src/test/resources/application.conf.j2
index 53f420d..94f1664 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf.j2
@@ -20,5 +20,15 @@ whisk {
                 retention-ms    = 3600000
             }
         }
+        common {
+          security-protocol: {{ kafka.protocol }}
+          ssl-truststore-location: {{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
+          ssl-truststore-password: {{ kafka.ssl.keystore.password }}
+          ssl-keystore-location: {{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
+          ssl-keystore-password: {{ kafka.ssl.keystore.password }}
+        }
+        consumer {
+          max-poll-interval-ms: 10000
+        }
     }
 }
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 7761856..d987cb7 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -46,6 +46,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
 
   val groupid = "kafkatest"
   val topic = "KafkaConnectorTestTopic"
+  val maxPollInterval = 10 seconds
 
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than
@@ -56,15 +57,11 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
   assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
-  val sessionTimeout: FiniteDuration = 10 seconds
-  val maxPollInterval: FiniteDuration = 10 seconds
+  println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+
   val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
-  val consumer = new KafkaConsumerConnector(
-    config.kafkaHosts,
-    groupid,
-    topic,
-    sessionTimeout = sessionTimeout,
-    maxPollInterval = maxPollInterval)
+  val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
 
   override def afterAll(): Unit = {
     producer.close()

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.