You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/13 01:19:11 UTC

kafka git commit: KAFKA-3520: Add system tests for REST APIs of list connector plugins and config validation

Repository: kafka
Updated Branches:
  refs/heads/trunk 527b98d82 -> 81f76bde8


KAFKA-3520: Add system tests for REST APIs of list connector plugins and config validation

ewen granders Ready for review.

Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1195 from Ishiihara/system-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f76bde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f76bde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f76bde

Branch: refs/heads/trunk
Commit: 81f76bde8565eaffd67e5adaa69ddfdb4f5cebaa
Parents: 527b98d
Author: Liquan Pei <li...@gmail.com>
Authored: Thu May 12 18:19:00 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 12 18:19:00 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py             |  9 ++++-
 .../tests/connect/connect_distributed_test.py   |  3 ++
 .../tests/connect/connect_rest_test.py          | 42 +++++++++++++++-----
 tests/kafkatest/tests/connect/connect_test.py   |  3 ++
 .../templates/connect-file-sink.properties      |  2 +-
 .../templates/connect-file-source.properties    |  2 +-
 6 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index aad9ff3..5371a72 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -101,7 +101,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
         self.security_config.clean_node(node)
-        node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
+        all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files)
+        node.account.ssh("rm -rf " + all_files, allow_fail=False)
 
     def config_filenames(self):
         return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
@@ -140,6 +141,12 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def resume_connector(self, name, node=None):
         return self._rest('/connectors/' + name + '/resume', method="PUT")
 
+    def list_connector_plugins(self, node=None):
+        return self._rest('/connector-plugins/', node=node)
+
+    def validate_config(self, connector_type, validate_request, node=None):
+        return self._rest('/connector-plugins/' + connector_type + '/config/validate', validate_request, node=node, method="PUT")
+
     def _rest(self, path, body=None, node=None, method="GET"):
         if node is None:
             node = random.choice(self.nodes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index d3ae2e1..a4d68f3 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -32,6 +32,9 @@ class ConnectDistributedTest(Test):
     another, validating the total output is identical to the input.
     """
 
+    FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
+    FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
+
     INPUT_FILE = "/mnt/connect.input"
     OUTPUT_FILE = "/mnt/connect.output"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 63b9bb1..c32b8e1 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -15,7 +15,6 @@
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
-from kafkatest.utils.util import retry_on_exception
 from ducktape.utils.util import wait_until
 import subprocess
 import json
@@ -27,6 +26,12 @@ class ConnectRestApiTest(KafkaTest):
     Test of Kafka Connect's REST API endpoints.
     """
 
+    FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
+    FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
+
+    FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topic', 'file'}
+    FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topics', 'file'}
+
     INPUT_FILE = "/mnt/connect.input"
     INPUT_FILE2 = "/mnt/connect.input2"
     OUTPUT_FILE = "/mnt/connect.output"
@@ -43,11 +48,11 @@ class ConnectRestApiTest(KafkaTest):
     LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"]
     LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n"
 
-    SCHEMA = { "type": "string", "optional": False }
+    SCHEMA = {"type": "string", "optional": False}
 
     def __init__(self, test_context):
         super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
+            'test': {'partitions': 1, 'replication-factor': 1}
         })
 
         self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
@@ -64,12 +69,23 @@ class ConnectRestApiTest(KafkaTest):
 
         assert self.cc.list_connectors() == []
 
-        self.logger.info("Creating connectors")
+        assert set([connector_plugin['class'] for connector_plugin in self.cc.list_connector_plugins()]) == {self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR}
+
         source_connector_props = self.render("connect-file-source.properties")
         sink_connector_props = self.render("connect-file-sink.properties")
-        for connector_props in [source_connector_props, sink_connector_props]:
-            connector_config = self._config_dict_from_props(connector_props)
-            self.cc.create_connector(connector_config, retries=120, retry_backoff=1)
+
+        self.logger.info("Validating connector configurations")
+        source_connector_config = self._config_dict_from_props(source_connector_props)
+        configs = self.cc.validate_config(self.FILE_SOURCE_CONNECTOR, source_connector_config)
+        self.verify_config(self.FILE_SOURCE_CONNECTOR, self.FILE_SOURCE_CONFIGS, configs)
+
+        sink_connector_config = self._config_dict_from_props(sink_connector_props)
+        configs = self.cc.validate_config(self.FILE_SINK_CONNECTOR, sink_connector_config)
+        self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs)
+
+        self.logger.info("Creating connectors")
+        self.cc.create_connector(source_connector_config, retries=120, retry_backoff=1)
+        self.cc.create_connector(sink_connector_config, retries=120, retry_backoff=1)
 
         # We should see the connectors appear
         wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]),
@@ -91,7 +107,7 @@ class ConnectRestApiTest(KafkaTest):
         expected_source_info = {
             'name': 'local-file-source',
             'config': self._config_dict_from_props(source_connector_props),
-            'tasks': [{ 'connector': 'local-file-source', 'task': 0 }]
+            'tasks': [{'connector': 'local-file-source', 'task': 0}]
         }
         source_info = self.cc.get_connector("local-file-source")
         assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info)
@@ -100,7 +116,7 @@ class ConnectRestApiTest(KafkaTest):
         expected_sink_info = {
             'name': 'local-file-sink',
             'config': self._config_dict_from_props(sink_connector_props),
-            'tasks': [{'connector': 'local-file-sink', 'task': 0 }]
+            'tasks': [{'connector': 'local-file-sink', 'task': 0}]
         }
         sink_info = self.cc.get_connector("local-file-sink")
         assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
@@ -164,3 +180,11 @@ class ConnectRestApiTest(KafkaTest):
     def _config_dict_from_props(self, connector_props):
         return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
 
+    def verify_config(self, name, config_def, configs):
+        # Should have zero errors
+        assert name == configs['name']
+        # Should have zero errors
+        assert 0 == configs['error_count']
+        # Should return all configuration
+        config_names = [config['definition']['name'] for config in configs['configs']]
+        assert config_def == set(config_names)

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 7b57402..9184390 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -31,6 +31,9 @@ class ConnectStandaloneFileTest(Test):
     identical to the input.
     """
 
+    FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
+    FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
+
     INPUT_FILE = "/mnt/connect.input"
     OUTPUT_FILE = "/mnt/connect.output"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
index ad78bb3..216dab5 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 name=local-file-sink
-connector.class=FileStreamSink
+connector.class={{ FILE_SINK_CONNECTOR }}
 tasks.max=1
 file={{ OUTPUT_FILE }}
 topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f76bde/tests/kafkatest/tests/connect/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
index d2d5e97..bff9720 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 name=local-file-source
-connector.class=FileStreamSource
+connector.class={{ FILE_SOURCE_CONNECTOR }}
 tasks.max=1
 file={{ INPUT_FILE }}
 topic={{ TOPIC }}
\ No newline at end of file