You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/13 01:19:11 UTC
kafka git commit: KAFKA-3520: Add system tests for REST APIs of list
connector plugins and config validation
Repository: kafka
Updated Branches:
refs/heads/trunk 527b98d82 -> 81f76bde8
KAFKA-3520: Add system tests for REST APIs of list connector plugins and config validation
ewen granders Ready for review.
Author: Liquan Pei <li...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1195 from Ishiihara/system-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f76bde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f76bde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f76bde
Branch: refs/heads/trunk
Commit: 81f76bde8565eaffd67e5adaa69ddfdb4f5cebaa
Parents: 527b98d
Author: Liquan Pei <li...@gmail.com>
Authored: Thu May 12 18:19:00 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 12 18:19:00 2016 -0700
----------------------------------------------------------------------
tests/kafkatest/services/connect.py | 9 ++++-
.../tests/connect/connect_distributed_test.py | 3 ++
.../tests/connect/connect_rest_test.py | 42 +++++++++++++++-----
tests/kafkatest/tests/connect/connect_test.py | 3 ++
.../templates/connect-file-sink.properties | 2 +-
.../templates/connect-file-source.properties | 2 +-
6 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index aad9ff3..5371a72 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -101,7 +101,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def clean_node(self, node):
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
self.security_config.clean_node(node)
- node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
+ all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files)
+ node.account.ssh("rm -rf " + all_files, allow_fail=False)
def config_filenames(self):
return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
@@ -140,6 +141,12 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def resume_connector(self, name, node=None):
return self._rest('/connectors/' + name + '/resume', method="PUT")
+ def list_connector_plugins(self, node=None):
+ return self._rest('/connector-plugins/', node=node)
+
+ def validate_config(self, connector_type, validate_request, node=None):
+ return self._rest('/connector-plugins/' + connector_type + '/config/validate', validate_request, node=node, method="PUT")
+
def _rest(self, path, body=None, node=None, method="GET"):
if node is None:
node = random.choice(self.nodes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index d3ae2e1..a4d68f3 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -32,6 +32,9 @@ class ConnectDistributedTest(Test):
another, validating the total output is identical to the input.
"""
+ FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
+ FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
+
INPUT_FILE = "/mnt/connect.input"
OUTPUT_FILE = "/mnt/connect.output"
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 63b9bb1..c32b8e1 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -15,7 +15,6 @@
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
-from kafkatest.utils.util import retry_on_exception
from ducktape.utils.util import wait_until
import subprocess
import json
@@ -27,6 +26,12 @@ class ConnectRestApiTest(KafkaTest):
Test of Kafka Connect's REST API endpoints.
"""
+ FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
+ FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
+
+ FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topic', 'file'}
+ FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topics', 'file'}
+
INPUT_FILE = "/mnt/connect.input"
INPUT_FILE2 = "/mnt/connect.input2"
OUTPUT_FILE = "/mnt/connect.output"
@@ -43,11 +48,11 @@ class ConnectRestApiTest(KafkaTest):
LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"]
LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n"
- SCHEMA = { "type": "string", "optional": False }
+ SCHEMA = {"type": "string", "optional": False}
def __init__(self, test_context):
super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
- 'test' : { 'partitions': 1, 'replication-factor': 1 }
+ 'test': {'partitions': 1, 'replication-factor': 1}
})
self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
@@ -64,12 +69,23 @@ class ConnectRestApiTest(KafkaTest):
assert self.cc.list_connectors() == []
- self.logger.info("Creating connectors")
+ assert set([connector_plugin['class'] for connector_plugin in self.cc.list_connector_plugins()]) == {self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR}
+
source_connector_props = self.render("connect-file-source.properties")
sink_connector_props = self.render("connect-file-sink.properties")
- for connector_props in [source_connector_props, sink_connector_props]:
- connector_config = self._config_dict_from_props(connector_props)
- self.cc.create_connector(connector_config, retries=120, retry_backoff=1)
+
+ self.logger.info("Validating connector configurations")
+ source_connector_config = self._config_dict_from_props(source_connector_props)
+ configs = self.cc.validate_config(self.FILE_SOURCE_CONNECTOR, source_connector_config)
+ self.verify_config(self.FILE_SOURCE_CONNECTOR, self.FILE_SOURCE_CONFIGS, configs)
+
+ sink_connector_config = self._config_dict_from_props(sink_connector_props)
+ configs = self.cc.validate_config(self.FILE_SINK_CONNECTOR, sink_connector_config)
+ self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs)
+
+ self.logger.info("Creating connectors")
+ self.cc.create_connector(source_connector_config, retries=120, retry_backoff=1)
+ self.cc.create_connector(sink_connector_config, retries=120, retry_backoff=1)
# We should see the connectors appear
wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]),
@@ -91,7 +107,7 @@ class ConnectRestApiTest(KafkaTest):
expected_source_info = {
'name': 'local-file-source',
'config': self._config_dict_from_props(source_connector_props),
- 'tasks': [{ 'connector': 'local-file-source', 'task': 0 }]
+ 'tasks': [{'connector': 'local-file-source', 'task': 0}]
}
source_info = self.cc.get_connector("local-file-source")
assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info)
@@ -100,7 +116,7 @@ class ConnectRestApiTest(KafkaTest):
expected_sink_info = {
'name': 'local-file-sink',
'config': self._config_dict_from_props(sink_connector_props),
- 'tasks': [{'connector': 'local-file-sink', 'task': 0 }]
+ 'tasks': [{'connector': 'local-file-sink', 'task': 0}]
}
sink_info = self.cc.get_connector("local-file-sink")
assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
@@ -164,3 +180,11 @@ class ConnectRestApiTest(KafkaTest):
def _config_dict_from_props(self, connector_props):
return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+ def verify_config(self, name, config_def, configs):
+ # Should have zero errors
+ assert name == configs['name']
+ # Should have zero errors
+ assert 0 == configs['error_count']
+ # Should return all configuration
+ config_names = [config['definition']['name'] for config in configs['configs']]
+ assert config_def == set(config_names)
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 7b57402..9184390 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -31,6 +31,9 @@ class ConnectStandaloneFileTest(Test):
identical to the input.
"""
+ FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
+ FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
+
INPUT_FILE = "/mnt/connect.input"
OUTPUT_FILE = "/mnt/connect.output"
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
index ad78bb3..216dab5 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -14,7 +14,7 @@
# limitations under the License.
name=local-file-sink
-connector.class=FileStreamSink
+connector.class={{ FILE_SINK_CONNECTOR }}
tasks.max=1
file={{ OUTPUT_FILE }}
topics={{ TOPIC }}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
index d2d5e97..bff9720 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -14,7 +14,7 @@
# limitations under the License.
name=local-file-source
-connector.class=FileStreamSource
+connector.class={{ FILE_SOURCE_CONNECTOR }}
tasks.max=1
file={{ INPUT_FILE }}
topic={{ TOPIC }}
\ No newline at end of file