You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/20 07:36:28 UTC
[incubator-openwhisk] branch master updated: Secure
connections/data-in-motion to Kafka using SSL. (#3258)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d53cc32 Secure connections/data-in-motion to Kafka using SSL. (#3258)
d53cc32 is described below
commit d53cc320fea6ea11b1bfdd90da9ac87c39e89e6c
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Tue Feb 20 08:36:25 2018 +0100
Secure connections/data-in-motion to Kafka using SSL. (#3258)
---
.gitignore | 3 +
ansible/files/genssl.sh | 119 +++++++++++++++++++++
ansible/group_vars/all | 13 ++-
ansible/roles/controller/tasks/deploy.yml | 13 ++-
ansible/roles/invoker/tasks/deploy.yml | 11 ++
ansible/roles/kafka/tasks/deploy.yml | 76 +++++++++++--
ansible/roles/nginx/files/genssl.sh | 79 --------------
ansible/setup.yml | 28 ++++-
ansible/tasks/writeWhiskProperties.yml | 7 +-
common/scala/src/main/resources/application.conf | 15 +++
.../connector/kafka/KafkaConsumerConnector.scala | 28 ++---
.../connector/kafka/KafkaMessagingProvider.scala | 14 ++-
.../connector/kafka/KafkaProducerConnector.scala | 8 +-
.../src/main/scala/whisk/core/WhiskConfig.scala | 2 +
.../{application.conf => application.conf.j2} | 10 ++
.../test/scala/services/KafkaConnectorTests.scala | 13 +--
16 files changed, 308 insertions(+), 131 deletions(-)
diff --git a/.gitignore b/.gitignore
index bfb7f48..710e6f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@
nginx.conf
whisk.properties
default.props
+/tests/src/test/resources/application.conf
.ant-targets-build.xml
/results/
@@ -60,6 +61,8 @@ ansible/tmp/*
ansible/roles/nginx/files/openwhisk-client*
ansible/roles/nginx/files/*.csr
ansible/roles/nginx/files/*cert.pem
+ansible/roles/nginx/files/*p12
+ansible/roles/kafka/files/*
# .zip files must be explicited whitelisted
*.zip
diff --git a/ansible/files/genssl.sh b/ansible/files/genssl.sh
new file mode 100755
index 0000000..a506a7e
--- /dev/null
+++ b/ansible/files/genssl.sh
@@ -0,0 +1,119 @@
+#!/bin/bash
+
+set -e
+
+PASSWORD="openwhisk"
+
+if [ "$#" -lt 3 ]; then
+ echo "usage: $0 <common name: host or ip> [server|client] <scriptdir> <OPTIONAL:TrustorePassword> <OPTIONAL:generateKey>"
+ exit
+fi
+CN=$1
+TYPE=$2
+SCRIPTDIR=$3
+export TRUSTSTORE_PASSWORD=${4:-PASSWORD}
+GENKEY=$5
+NAME_PREFIX=$6
+
+
+
+## generates a (self-signed) certificate
+if [[ -n $GENKEY ]]
+then
+ openssl genrsa -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" 2048
+fi
+function gen_csr(){
+ echo generating server certificate request
+ openssl req -new \
+ -key "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+ -nodes \
+ -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
+ -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-request.csr"
+}
+function gen_cert(){
+ echo generating self-signed password-less server certificate
+ openssl x509 -req \
+ -in "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-request.csr" \
+ -signkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+ -out ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem \
+ -days 365
+}
+
+if [ "$TYPE" == "server_with_JKS_keystore" ]; then
+ gen_csr
+ gen_cert
+ echo generate new key and place it in the keystore
+ keytool -genkey -v \
+ -alias $CN \
+ -dname "C=US,ST=NY,L=Yorktown,O=OpenWhisk,CN=$CN" \
+ -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks \
+ -keypass:env TRUSTSTORE_PASSWORD \
+ -storepass:env TRUSTSTORE_PASSWORD \
+ -keyalg RSA \
+ -ext KeyUsage:critical="keyCertSign" \
+ -ext BasicConstraints:critical="ca:true" \
+ -validity 365
+ echo export private key from the keystore
+ keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias $CN -certreq -file ${SCRIPTDIR}/${NAME_PREFIX}cert-file -storepass:env TRUSTSTORE_PASSWORD
+ echo sign the certificate with private key
+ openssl x509 -req -CA ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem -CAkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" -in ${SCRIPTDIR}/${NAME_PREFIX}cert-file -out ${SCRIPTDIR}/${NAME_PREFIX}cert-signed -days 365 -CAcreateserial -passin pass:$TRUSTSTORE_PASSWORD
+ echo import CA cert in the keystore
+ keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias CARoot -import -file ${SCRIPTDIR}/${NAME_PREFIX}openwhisk-server-cert.pem -storepass:env TRUSTSTORE_PASSWORD -noprompt
+ echo import the private key in the keystore
+ keytool -keystore ${SCRIPTDIR}/${NAME_PREFIX}keystore.jks -alias $CN -import -file ${SCRIPTDIR}/${NAME_PREFIX}cert-signed -storepass:env TRUSTSTORE_PASSWORD -noprompt
+
+elif [ "$TYPE" == "server" ]; then
+ gen_csr
+ gen_cert
+ echo generate keystore
+ openssl pkcs12 -export -name $CN \
+ -passout pass:$TRUSTSTORE_PASSWORD \
+ -in "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-cert.pem" \
+ -inkey "$SCRIPTDIR/${NAME_PREFIX}openwhisk-server-key.pem" \
+ -out "$SCRIPTDIR/${NAME_PREFIX}openwhisk-keystore.p12"
+else
+ echo generating client ca key
+ openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-ca-key.pem" 2048
+
+ echo generating client ca request
+ openssl req -new \
+ -key "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+ -passin pass:$PASSWORD \
+ -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
+ -out "$SCRIPTDIR/openwhisk-client-ca.csr"
+
+ echo generating client ca pem
+ openssl x509 -req \
+ -in "$SCRIPTDIR/openwhisk-client-ca.csr" \
+ -signkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+ -passin pass:$PASSWORD \
+ -out "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
+ -days 365 -sha1 -extensions v3_ca
+
+ echo generating client key
+ openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-key.pem" 2048
+
+ echo generating client certificate csr file
+ openssl req -new \
+ -key "$SCRIPTDIR/openwhisk-client-key.pem" \
+ -passin pass:$PASSWORD \
+ -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=guest" \
+ -out "$SCRIPTDIR/openwhisk-client-certificate-request.csr"
+
+ echo generating self-signed client certificate
+ echo 01 > $SCRIPTDIR/openwhisk-client-ca-cert.srl
+ openssl x509 -req \
+ -in "$SCRIPTDIR/openwhisk-client-certificate-request.csr" \
+ -CA "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
+ -CAkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
+ -CAserial "$SCRIPTDIR/openwhisk-client-ca-cert.srl" \
+ -passin pass:$PASSWORD \
+ -out "$SCRIPTDIR/openwhisk-client-cert.pem" \
+ -days 365 -sha1 -extensions v3_req
+
+ echo remove client key\'s password
+ openssl rsa \
+ -in "$SCRIPTDIR/openwhisk-client-key.pem" \
+ -passin pass:$PASSWORD \
+ -out "$SCRIPTDIR/openwhisk-client-key.pem"
+fi
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 23eeeac..8356733 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -75,8 +75,15 @@ registry:
confdir: "{{ config_root_dir }}/registry"
kafka:
+ ssl:
+ client_authentication: required
+ keystore:
+ name: kafka-keystore.jks
+ password: openwhisk
+ protocol: "{{ kafka_protocol_for_setup }}"
version: 0.11.0.1
- port: 9092
+ port: 9072
+ advertisedPort: 9093
ras:
port: 8093
heap: "{{ kafka_heap | default('1g') }}"
@@ -84,10 +91,12 @@ kafka:
kafka_connect_string: "{% set ret = [] %}\
{% for host in groups['kafkas'] %}\
- {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.port+loop.index-1)|string) ) }}\
+ {{ ret.append( hostvars[host].ansible_host + ':' + ((kafka.advertisedPort+loop.index-1)|string) ) }}\
{% endfor %}\
{{ ret | join(',') }}"
+kafka_protocol_for_setup: "{{ kafka_protocol | default('PLAINTEXT') }}"
+
zookeeper:
version: 3.4
port: 2181
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index f075664..52dbc80 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -45,6 +45,12 @@
dest: "{{ controller.confdir }}/{{ controller_name }}/jmxremote.access"
mode: 0777
+- name: "copy kafka truststore/keystore"
+ when: kafka.protocol == 'SSL'
+ copy:
+ src: "{{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}"
+ dest: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}"
+
- name: check, that required databases exist
include: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
vars:
@@ -106,7 +112,11 @@
"CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ kafka_topics_health_retentionBytes | default() }}"
"CONFIG_whisk_kafka_topics_health_retentionMs": "{{ kafka_topics_health_retentionMS | default() }}"
"CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}"
-
+ "CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
+ "CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
+ "CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"
+ "CONFIG_whisk_kafka_common_sslKeystoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
+ "CONFIG_whisk_kafka_common_sslKeystorePassword": "{{ kafka.ssl.keystore.password }}"
"DB_PROTOCOL": "{{ db_protocol }}"
"DB_PROVIDER": "{{ db_provider }}"
"DB_HOST": "{{ db_host }}"
@@ -157,7 +167,6 @@
"CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
"CONFIG_whisk_spi_LoadBalancerProvider": "{{ controller.loadbalancer.spi }}"
-
"CONFIG_logback_log_level": "{{ controller.loglevel }}"
"CONFIG_whisk_transactions_stride": "{{ transactions.stride | default() }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index be2a251..0f3bdc6 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -60,6 +60,12 @@
mode: 0777
become: "{{ invoker.dir.become }}"
+- name: "copy kafka truststore/keystore"
+ when: kafka.protocol == 'SSL'
+ copy:
+ src: "{{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}"
+ dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}"
+
- name: check, that required databases exist
include: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
vars:
@@ -149,6 +155,11 @@
-e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ kafka_topics_invoker_retentionBytes | default() }}'
-e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ kafka_topics_invoker_retentionMS | default() }}'
-e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ kafka_topics_invoker_segmentBytes | default() }}'
+ -e CONFIG_whisk_kafka_common_securityProtocol='{{ kafka.protocol }}'
+ -e CONFIG_whisk_kafka_common_sslTruststoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
+ -e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ kafka.ssl.keystore.password }}'
+ -e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
+ -e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ kafka.ssl.keystore.password }}'
-e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
-e DB_PROTOCOL='{{ db_protocol }}'
-e DB_PROVIDER='{{ db_provider }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index cf59919..f7c3270 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -6,6 +6,67 @@
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
+- name: "create kafka certificate directory"
+ file:
+ path: "{{ config_root_dir }}/kafka/certs"
+ state: directory
+ mode: 0777
+
+- name: "copy keystore"
+ when: kafka.protocol == 'SSL'
+ copy:
+ src: "files/{{ kafka.ssl.keystore.name }}"
+ dest: "{{ config_root_dir }}/kafka/certs"
+
+- name: add kafka default env vars
+ set_fact:
+ kafka_env_vars:
+ "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+ "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
+ "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
+ "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+ "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
+
+- name: add kafka non-ssl vars
+ when: kafka.protocol != 'SSL'
+ set_fact:
+ kafka_non_ssl_vars:
+ "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+- name: add kafka ssl env vars
+ when: kafka.protocol == 'SSL'
+ set_fact:
+ kafka_ssl_env_vars:
+ "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
+ "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_PROTOCOL_NAME": "INTERNAL"
+ "KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
+ "KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+ "KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+ "KAFKA_SSL_TRUSTSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
+ "KAFKA_SSL_TRUSTSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
+ "KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
+ # The sed script passed in CUSTOM_INIT_SCRIPT fixes a bug in the wurstmeister dcoker image
+ # by patching the server.configuration file right before kafka is started.
+ # The script adds the missing advertized hostname to the advertised.listener property
+ # Issue: https://github.com/wurstmeister/kafka-docker/issues/221
+ "CUSTOM_INIT_SCRIPT": sed -i \'/^advertised\\.listeners/ s/\\/\\/\\:/\\/\\/{{ ansible_host }}\\:/\' /opt/kafka/config/server.properties
+
+- name: "join kafka ssl env vars"
+ when: kafka.protocol == 'SSL'
+ set_fact:
+ kafka_env_vars: "{{ kafka_env_vars | combine(kafka_ssl_env_vars) }}"
+
+- name: join kafka non-ssl env vars
+ when: kafka.protocol != 'SSL'
+ set_fact:
+ kafka_env_vars: "{{ kafka_env_vars | combine(kafka_non_ssl_vars) }}"
+
- name: (re)start kafka
vars:
zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
@@ -15,17 +76,12 @@
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
- env:
- "KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
- "KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
- "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
- "KAFKA_ADVERTISED_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
- "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
- "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
- "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
- "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
+ env: "{{ kafka_env_vars }}"
ports:
- - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
+ - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+ - "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ volumes:
+ - "{{ config_root_dir }}/kafka/certs:/config"
- name: wait until the kafka server started up
shell: docker logs kafka{{ groups['kafkas'].index(inventory_hostname) }}
diff --git a/ansible/roles/nginx/files/genssl.sh b/ansible/roles/nginx/files/genssl.sh
deleted file mode 100755
index e61e9b2..0000000
--- a/ansible/roles/nginx/files/genssl.sh
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/bin/bash
-
-set -e
-
-SCRIPTDIR="$(cd $(dirname "$0")/ && pwd)"
-
-if [ "$#" -lt 2 ]; then
- echo "usage: $0 <common name: host or ip> [server|client]"
- exit
-fi
-CN=$1
-TYPE=$2
-PASSWORD="openwhisk"
-
-## generates a (self-signed) certificate
-
-## uncomment to regenerate the key
-#openssl genrsa -out "$SCRIPTDIR/openwhisk-server-key.pem" 2048
-
-if [ "$TYPE" == "server" ]; then
- echo generating server certificate request
- openssl req -new \
- -key "$SCRIPTDIR/openwhisk-server-key.pem" \
- -nodes \
- -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
- -out "$SCRIPTDIR/openwhisk-server-request.csr"
-
- echo generating self-signed password-less server certificate
- openssl x509 -req \
- -in "$SCRIPTDIR/openwhisk-server-request.csr" \
- -signkey "$SCRIPTDIR/openwhisk-server-key.pem" \
- -out "$SCRIPTDIR/openwhisk-server-cert.pem" \
- -days 365
-else
- echo generating client ca key
- openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-ca-key.pem" 2048
-
- echo generating client ca request
- openssl req -new \
- -key "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
- -passin pass:$PASSWORD \
- -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=$CN" \
- -out "$SCRIPTDIR/openwhisk-client-ca.csr"
-
- echo generating client ca pem
- openssl x509 -req \
- -in "$SCRIPTDIR/openwhisk-client-ca.csr" \
- -signkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
- -passin pass:$PASSWORD \
- -out "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
- -days 365 -sha1 -extensions v3_ca
-
- echo generating client key
- openssl genrsa -aes256 -passout pass:$PASSWORD -out "$SCRIPTDIR/openwhisk-client-key.pem" 2048
-
- echo generating client certificate csr file
- openssl req -new \
- -key "$SCRIPTDIR/openwhisk-client-key.pem" \
- -passin pass:$PASSWORD \
- -subj "/C=US/ST=NY/L=Yorktown/O=OpenWhisk/CN=guest" \
- -out "$SCRIPTDIR/openwhisk-client-certificate-request.csr"
-
- echo generating self-signed client certificate
- echo 01 > $SCRIPTDIR/openwhisk-client-ca-cert.srl
- openssl x509 -req \
- -in "$SCRIPTDIR/openwhisk-client-certificate-request.csr" \
- -CA "$SCRIPTDIR/openwhisk-client-ca-cert.pem" \
- -CAkey "$SCRIPTDIR/openwhisk-client-ca-key.pem" \
- -CAserial "$SCRIPTDIR/openwhisk-client-ca-cert.srl" \
- -passin pass:$PASSWORD \
- -out "$SCRIPTDIR/openwhisk-client-cert.pem" \
- -days 365 -sha1 -extensions v3_req
-
- echo remove client key\'s password
- openssl rsa \
- -in "$SCRIPTDIR/openwhisk-client-key.pem" \
- -passin pass:$PASSWORD \
- -out "$SCRIPTDIR/openwhisk-client-key.pem"
-fi
diff --git a/ansible/setup.yml b/ansible/setup.yml
index a5c28f9..6a20d99 100644
--- a/ansible/setup.yml
+++ b/ansible/setup.yml
@@ -17,14 +17,34 @@
docker_machine_ip: "{{ result.stdout }}"
when: "'environments/docker-machine' in hosts_dir"
- - name: gen hosts for docker-machine
+ - name: gen hosts for docker-machine
local_action: template src="{{playbook_dir}}/environments/docker-machine/hosts.j2.ini" dest="{{ playbook_dir }}/environments/docker-machine/hosts"
when: "'environments/docker-machine' in hosts_dir"
- name: gen untrusted server certificate for host
- local_action: shell "{{ playbook_dir }}/roles/nginx/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "server"
+ local_action: shell "{{ playbook_dir }}/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "server" "{{ playbook_dir }}/roles/nginx/files"
when: nginx.ssl.cert == "openwhisk-server-cert.pem"
-
+
- name: gen untrusted client certificate for host
- local_action: shell "{{ playbook_dir }}/roles/nginx/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "client"
+ local_action: shell "{{ playbook_dir }}/files/genssl.sh" "*.{{ whisk_api_localhost_name | default(whisk_api_host_name) | default(whisk_api_localhost_name_default) }}" "client" "{{ playbook_dir }}/roles/nginx/files"
when: nginx.ssl.client_ca_cert == "openwhisk-client-ca-cert.pem"
+
+ - name: clean up old kafka keystore
+ file:
+ path: "{{ playbook_dir }}/roles/kafka/files"
+ state: absent
+ become: "{{ logs.dir.become }}"
+ when: kafka_protocol_for_setup == 'SSL'
+
+ - name: ensure kafka files directory exists
+ file:
+ path: "{{ playbook_dir }}/roles/kafka/files/"
+ state: directory
+ mode: 0777
+ become: "{{ logs.dir.become }}"
+ when: kafka_protocol_for_setup == 'SSL'
+
+
+ - name: generate kafka certificates
+ local_action: shell "{{ playbook_dir }}/files/genssl.sh" "openwhisk-kafka" "server_with_JKS_keystore" "{{ playbook_dir }}/roles/kafka/files" openwhisk "generateKey" "kafka-"
+ when: kafka_protocol_for_setup == 'SSL'
diff --git a/ansible/tasks/writeWhiskProperties.yml b/ansible/tasks/writeWhiskProperties.yml
index 4f5fc14..e94de22 100644
--- a/ansible/tasks/writeWhiskProperties.yml
+++ b/ansible/tasks/writeWhiskProperties.yml
@@ -1,8 +1,13 @@
---
-# This task will write whisk.properties to the openwhisk_home.
+# This task will write whisk.properties to the openwhisk_home.
# Currently whisk.properties is still needed for tests.
- name: write whisk.properties template to openwhisk_home
template:
src: whisk.properties.j2
dest: "{{ openwhisk_home }}/whisk.properties"
+
+- name: write test's application conf overrides
+ template:
+ src: "{{ openwhisk_home }}/tests/src/test/resources/application.conf.j2"
+ dest: "{{ openwhisk_home }}/tests/src/test/resources/application.conf"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 8468286..3e09bba 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -54,10 +54,25 @@ whisk {
kafka {
replication-factor = 1
+ common {
+ security-protocol = PLAINTEXT
+ }
producer {
acks = 1
max-request-size = ${whisk.activation.payload.max}
}
+ consumer {
+ session-timeout-ms = 30000
+ heartbeat-interval-ms = 10000
+ enable-auto-commit = true
+ auto-commit-interval-ms = 10000
+ auto-offset-reset = earliest
+ max-poll-interval = 360000
+ // This value controls the server-side wait time which affects polling latency.
+ // A low value improves latency performance but it is important to not set it too low
+ // as that will cause excessive busy-waiting.
+ fetch-max-wait-ms = 20
+ }
topics {
cache-invalidation {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index a1116d8..09cd3ac 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -23,23 +23,18 @@ import scala.collection.JavaConversions.iterableAsScalaIterable
import scala.collection.JavaConversions.seqAsJavaList
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-
+import pureconfig.loadConfigOrThrow
import whisk.common.Logging
+import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
class KafkaConsumerConnector(kafkahost: String,
groupid: String,
topic: String,
- override val maxPeek: Int = Int.MaxValue,
- readeos: Boolean = true,
- sessionTimeout: FiniteDuration = 30.seconds,
- autoCommitInterval: FiniteDuration = 10.seconds,
- maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging)
+ override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging)
extends MessageConsumer {
/**
@@ -68,19 +63,14 @@ class KafkaConsumerConnector(kafkahost: String,
val props = new Properties
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)
- props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, (sessionTimeout.toMillis / 3).toString)
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true.toString)
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval.toMillis.toString)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString)
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (!readeos) "latest" else "earliest")
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)
-
- // This value controls the server-side wait time which affects polling latency.
- // A low value improves latency performance but it is important to not set it too low
- // as that will cause excessive busy-waiting.
- props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "20")
+ val config =
+ KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+ KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
+ config.foreach {
+ case (key, value) => props.put(key, value)
+ }
props
}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index ac148cd..3c8df85 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -23,19 +23,16 @@ import java.util.concurrent.ExecutionException
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.collection.JavaConverters._
-
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.errors.TopicExistsException
-
import whisk.common.Logging
import whisk.core.ConfigKeys
import whisk.core.WhiskConfig
import whisk.core.connector.MessageConsumer
import whisk.core.connector.MessageProducer
import whisk.core.connector.MessagingProvider
-
import pureconfig._
case class KafkaConfig(replicationFactor: Short)
@@ -44,9 +41,10 @@ case class KafkaConfig(replicationFactor: Short)
* A Kafka based implementation of MessagingProvider
*/
object KafkaMessagingProvider extends MessagingProvider {
+
def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging): MessageConsumer =
- new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek, maxPollInterval = maxPollInterval)
+ new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
new KafkaProducerConnector(config.kafkaHosts, ec)
@@ -57,6 +55,14 @@ object KafkaMessagingProvider extends MessagingProvider {
loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
+ val commonConfig =
+ KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
+ commonConfig.foreach {
+ case (key, value) => {
+ props.put(key, value)
+ }
+ }
+
val client = AdminClient.create(props)
val numPartitions = 1
val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 7b0977b..dca7cce 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -89,8 +89,12 @@ class KafkaProducerConnector(kafkahosts: String,
// Load additional config from the config files and add them here.
val config =
- KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
- config.foreach { case (key, value) => props.put(key, value) }
+ KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+ KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
+
+ config.foreach {
+ case (key, value) => props.put(key, value)
+ }
props
}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index cf66627..79c6d89 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -236,7 +236,9 @@ object ConfigKeys {
val loadbalancer = "whisk.loadbalancer"
val kafka = "whisk.kafka"
+ val kafkaCommon = s"$kafka.common"
val kafkaProducer = s"$kafka.producer"
+ val kafkaConsumer = s"$kafka.consumer"
val kafkaTopics = s"$kafka.topics"
val memory = "whisk.memory"
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf.j2
similarity index 55%
rename from tests/src/test/resources/application.conf
rename to tests/src/test/resources/application.conf.j2
index 53f420d..94f1664 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf.j2
@@ -20,5 +20,15 @@ whisk {
retention-ms = 3600000
}
}
+ common {
+ security-protocol: {{ kafka.protocol }}
+ ssl-truststore-location: {{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
+ ssl-truststore-password: {{ kafka.ssl.keystore.password }}
+ ssl-keystore-location: {{ openwhisk_home }}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
+ ssl-keystore-password: {{ kafka.ssl.keystore.password }}
+ }
+ consumer {
+ max-poll-interval-ms: 10000
+ }
}
}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 7761856..d987cb7 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -46,6 +46,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
val groupid = "kafkatest"
val topic = "KafkaConnectorTestTopic"
+ val maxPollInterval = 10 seconds
// Need to overwrite replication factor for tests that shut down and start
// Kafka instances intentionally. These tests will fail if there is more than
@@ -56,15 +57,11 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
- val sessionTimeout: FiniteDuration = 10 seconds
- val maxPollInterval: FiniteDuration = 10 seconds
+ println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
+ assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+
val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
- val consumer = new KafkaConsumerConnector(
- config.kafkaHosts,
- groupid,
- topic,
- sessionTimeout = sessionTimeout,
- maxPollInterval = maxPollInterval)
+ val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
override def afterAll(): Unit = {
producer.close()
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.