You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/06 02:09:22 UTC
[15/21] kafka git commit: KAFKA-3464: Add system tests for Connect
with Kafka security enabled
KAFKA-3464: Add system tests for Connect with Kafka security enabled
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Ismael Juma, Gwen Shapira
Closes #1141 from ewencp/kafka-3464-connect-security-system-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98978139
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98978139
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98978139
Branch: refs/heads/0.10.0
Commit: 9897813957c1b5cd70a0dd02094f184f73e06979
Parents: 80ba01e
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Apr 4 18:49:29 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700
----------------------------------------------------------------------
tests/kafkatest/services/connect.py | 6 ++++
tests/kafkatest/services/mirror_maker.py | 1 -
.../services/security/security_config.py | 24 ++++++++-----
.../tests/connect/connect_distributed_test.py | 36 +++++++++++++++----
tests/kafkatest/tests/connect/connect_test.py | 37 +++++++++++++++-----
.../templates/connect-distributed.properties | 7 ++--
.../templates/connect-standalone.properties | 4 ++-
7 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 76336e1..51dade3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -48,6 +48,7 @@ class ConnectServiceBase(Service):
def __init__(self, context, num_nodes, kafka, files):
super(ConnectServiceBase, self).__init__(context, num_nodes)
self.kafka = kafka
+ self.security_config = kafka.security_config.client_config()
self.files = files
def pids(self, node):
@@ -89,6 +90,7 @@ class ConnectServiceBase(Service):
def clean_node(self, node):
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
+ self.security_config.clean_node(node)
node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
def config_filenames(self):
@@ -153,6 +155,7 @@ class ConnectStandaloneService(ConnectServiceBase):
def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+ cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " ".join(connector_configs)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
@@ -161,6 +164,7 @@ class ConnectStandaloneService(ConnectServiceBase):
def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+ self.security_config.setup_node(node)
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
remote_connector_configs = []
@@ -190,6 +194,7 @@ class ConnectDistributedService(ConnectServiceBase):
def start_cmd(self, node):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+ cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
return cmd
@@ -197,6 +202,7 @@ class ConnectDistributedService(ConnectServiceBase):
def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+ self.security_config.setup_node(node)
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
if self.connector_config_templates:
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 4386788..cb4b2c1 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -1,4 +1,3 @@
-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index b5efba8..1bbabd2 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -17,6 +17,7 @@ import os
import subprocess
from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
+import itertools
class Keytool(object):
@@ -172,17 +173,22 @@ class SecurityConfig(TemplateRenderer):
else:
return ""
- def __str__(self):
+ def props(self, prefix=''):
"""
- Return properties as string with line separators.
+ Return properties as string with line separators, optionally with a prefix.
This is used to append security config properties to
a properties file.
+ :param prefix: prefix to add to each property
+ :return: a string containing line-separated properties
"""
+ if self.security_protocol == SecurityConfig.PLAINTEXT:
+ return ""
+ config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
+ # Extra blank lines ensure this can be appended/prepended safely
+ return "\n".join(itertools.chain([""], config_lines, [""]))
- prop_str = ""
- if self.security_protocol != SecurityConfig.PLAINTEXT:
- for key, value in self.properties.items():
- prop_str += ("\n" + key + "=" + value)
- prop_str += "\n"
- return prop_str
-
+ def __str__(self):
+ """
+ Return properties as a string with line separators.
+ """
+ return self.props()
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 9aa16ab..698a827 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -13,15 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from kafkatest.tests.kafka_test import KafkaTest
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import subprocess, itertools, time
from collections import Counter
-class ConnectDistributedTest(KafkaTest):
+class ConnectDistributedTest(Test):
"""
Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
another, validating the total output is identical to the input.
@@ -45,22 +49,39 @@ class ConnectDistributedTest(KafkaTest):
SCHEMA = { "type": "string", "optional": False }
def __init__(self, test_context):
- super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+ super(ConnectDistributedTest, self).__init__(test_context)
+ self.num_zk = 1
+ self.num_brokers = 1
+ self.topics = {
'test' : { 'partitions': 1, 'replication-factor': 1 }
- })
+ }
+
+ self.zk = ZookeeperService(test_context, self.num_zk)
- self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
- self.cc.log_level = "DEBUG"
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
self.schemas = True
- def test_file_source_and_sink(self):
+ def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT):
+ self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+ security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
+ topics=self.topics)
+
+ self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+ self.cc.log_level = "DEBUG"
+
+ self.zk.start()
+ self.kafka.start()
+
+
+ @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+ def test_file_source_and_sink(self, security_protocol):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
"""
+ self.setup_services(security_protocol=security_protocol)
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()
@@ -94,6 +115,7 @@ class ConnectDistributedTest(KafkaTest):
"""
num_tasks = 3
+ self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 90f219a..7b57402 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -13,14 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from kafkatest.tests.kafka_test import KafkaTest
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectStandaloneService
from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
import hashlib, subprocess, json
-class ConnectStandaloneFileTest(KafkaTest):
+class ConnectStandaloneFileTest(Test):
"""
Simple test of Kafka Connect that produces data from a file in one
standalone process and consumes it on another, validating the output is
@@ -42,24 +46,39 @@ class ConnectStandaloneFileTest(KafkaTest):
SCHEMA = { "type": "string", "optional": False }
def __init__(self, test_context):
- super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+ super(ConnectStandaloneFileTest, self).__init__(test_context)
+ self.num_zk = 1
+ self.num_brokers = 1
+ self.topics = {
'test' : { 'partitions': 1, 'replication-factor': 1 }
- })
+ }
- self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
- self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
- self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
+ self.zk = ZookeeperService(test_context, self.num_zk)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
@parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
- def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
+ @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+ def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
assert converter != None, "converter type must be set"
# Template parameters
self.key_converter = converter
self.value_converter = converter
self.schemas = schemas
+ self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+ security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
+ topics=self.topics)
+
+ self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
+ self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+ self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
+ consumer_timeout_ms=1000, new_consumer=True)
+
+
+ self.zk.start()
+ self.kafka.start()
+
self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 7a7440a..48f5f78 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-bootstrap.servers={{ kafka.bootstrap_servers() }}
+bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props() }}
+{{ kafka.security_config.client_config().props("producer.") }}
+{{ kafka.security_config.client_config().props("consumer.") }}
group.id={{ group|default("connect-cluster") }}
@@ -43,4 +46,4 @@ rest.advertised.host.name = {{ node.account.hostname }}
# Reduce session timeouts so tests that kill workers don't need to wait as long to recover
session.timeout.ms=10000
-consumer.session.timeout.ms=10000
\ No newline at end of file
+consumer.session.timeout.ms=10000
http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/templates/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index bf1daf7..09c6487 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-bootstrap.servers={{ kafka.bootstrap_servers() }}
+bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props("producer.") }}
+{{ kafka.security_config.client_config().props("consumer.") }}
key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}