You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2022/03/30 20:47:30 UTC

[kafka] branch 3.0 updated: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new b1a5091  KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)
b1a5091 is described below

commit b1a5091973fcffaeb1540366e57c773482e9e262
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Wed Mar 30 13:15:42 2022 -0700

    KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)
    
    With this change we stop including the non-production grade connectors that are meant to be used for demos and quick starts by default in the CLASSPATH and plugin.path of Connect deployments. The package of these connector will still be shipped with the Apache Kafka distribution and will be available for explicit inclusion.
    
    The changes have been tested through the system tests and the existing unit and integration tests.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 bin/kafka-run-class.sh                             |  4 +--
 docs/connect.html                                  |  1 +
 docs/quickstart.html                               |  4 +--
 tests/kafkatest/services/connect.py                | 42 +++++++++++++++++++---
 .../tests/connect/connect_distributed_test.py      | 12 ++++---
 tests/kafkatest/tests/connect/connect_rest_test.py |  3 +-
 tests/kafkatest/tests/connect/connect_test.py      | 12 ++++---
 7 files changed, 59 insertions(+), 19 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 6167583..490f930 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -32,7 +32,7 @@ if [ -z "$INCLUDE_TEST_JARS" ]; then
 fi
 
 # Exclude jars not necessary for running commands.
-regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
+regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$"
 should_include_file() {
   if [ "$INCLUDE_TEST_JARS" = true ]; then
     return 0
@@ -171,7 +171,7 @@ do
   CLASSPATH="$CLASSPATH:$dir/*"
 done
 
-for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
+for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
 do
   for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do
diff --git a/docs/connect.html b/docs/connect.html
index 07f8778..f1b4da1 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -49,6 +49,7 @@
         <li><code>bootstrap.servers</code> - List of Kafka servers used to bootstrap connections to Kafka</li>
         <li><code>key.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
         <li><code>value.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
+        <li><code>plugin.path</code> (default <code>empty</code>) - a list of paths that contain Connect plugins (connectors, converters, transformations). Before running quick starts, users must add the absolute path that contains the example FileStreamSourceConnector and FileStreamSinkConnector packaged in <code>connect-file-"version".jar</code>, because these connectors are not included by default to the <code>CLASSPATH</code> or the <code>plugin.path</code> of the Connect worker (see [...]
     </ul>
 
     <p>The important configuration options specific to standalone mode are:</p>
diff --git a/docs/quickstart.html b/docs/quickstart.html
index eda3ede..f7a6471 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -32,8 +32,8 @@
             the latest Kafka release and extract it:
         </p>
 
-        <pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_2.13-2.8.0.tgz
-$ cd kafka_2.13-2.8.0</code></pre>
+        <pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
     </div>
 
     <div class="quickstart-step">
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 26c0d92..41c33cc 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -69,7 +69,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
+    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec=60,
+                 include_filestream_connectors=False):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
@@ -78,6 +79,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.startup_timeout_sec = startup_timeout_sec
         self.environment = {}
         self.external_config_template_func = None
+        self.include_filestream_connectors = include_filestream_connectors
+        self.logger.debug("include_filestream_connectors % s", include_filestream_connectors)
 
     def pids(self, node):
         """Return process ids for Kafka Connect processes."""
@@ -279,12 +282,34 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
         self.environment[envvar] = env_opts
 
+    def append_filestream_connectors_to_classpath(self):
+        if self.include_filestream_connectors:
+            cwd = os.getcwd()
+            self.logger.info("Including filestream connectors when starting Connect. "
+                             "Looking for jar locally in: %s" % cwd)
+            relative_path = "/connect/file/build/libs/"
+            local_dir = cwd + relative_path
+            lib_dir = self.path.home() + relative_path
+            for pwd, dirs, files in os.walk(local_dir):
+                for file in files:
+                    if file.startswith("connect-file") and file.endswith(".jar"):
+                        # Use the expected directory on the node instead of the path in the driver node
+                        file_path = lib_dir + file
+                        self.logger.debug("Appending %s to Connect worker's CLASSPATH" % file_path)
+                        return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
+            self.logger.info("Jar with filestream connectors was not found under %s" % lib_dir)
+        else:
+            self.logger.info("Starting Connect without filestream connectors in the CLASSPATH")
+
+        return None
+
 
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
 
-    def __init__(self, context, kafka, files, startup_timeout_sec = 60):
-        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
+    def __init__(self, context, kafka, files, startup_timeout_sec=60, include_filestream_connectors=False):
+        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec,
+                                                       include_filestream_connectors)
 
     # For convenience since this service only makes sense with a single node
     @property
@@ -299,6 +324,9 @@ class ConnectStandaloneService(ConnectServiceBase):
 
         cmd += fix_opts_for_new_jvm(node)
         cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
+        classpath = self.append_filestream_connectors_to_classpath()
+        cmd += classpath if classpath else ""
+
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
         cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
@@ -339,8 +367,9 @@ class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
-                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
-        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
+                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec=60,
+                 include_filestream_connectors=False):
+        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec, include_filestream_connectors)
         self.startup_mode = self.STARTUP_MODE_JOIN
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
@@ -355,6 +384,9 @@ class ConnectDistributedService(ConnectServiceBase):
         cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
+
+        classpath = self.append_filestream_connectors_to_classpath()
+        cmd += classpath if classpath else ""
         cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
         return cmd
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 6bc52b0..970779f 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -80,7 +80,7 @@ class ConnectDistributedTest(Test):
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False):
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                   topics=self.topics, version=broker_version,
@@ -89,7 +89,8 @@ class ConnectDistributedTest(Test):
             for node in self.kafka.nodes:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
 
-        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE],
+                                            include_filestream_connectors=include_filestream_connectors)
         self.cc.log_level = "DEBUG"
 
         self.zk.start()
@@ -370,7 +371,7 @@ class ConnectDistributedTest(Test):
         """
 
         self.CONNECT_PROTOCOL = connect_protocol
-        self.setup_services(security_protocol=security_protocol)
+        self.setup_services(security_protocol=security_protocol, include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
@@ -522,7 +523,7 @@ class ConnectDistributedTest(Test):
     @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
     def test_transformations(self, connect_protocol):
         self.CONNECT_PROTOCOL = connect_protocol
-        self.setup_services(timestamp_type='CreateTime')
+        self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
 
@@ -610,7 +611,8 @@ class ConnectDistributedTest(Test):
         or relies upon the broker to auto-create the topics (v0.10.0.x and before).
         """
         self.CONNECT_PROTOCOL = connect_protocol
-        self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics, security_protocol=security_protocol)
+        self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics,
+                            security_protocol=security_protocol, include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 4d978a2..ff44d94 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -73,7 +73,8 @@ class ConnectRestApiTest(KafkaTest):
             'test': {'partitions': 1, 'replication-factor': 1}
         })
 
-        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
+        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE],
+                                            include_filestream_connectors=True)
 
     @cluster(num_nodes=4)
     @matrix(connect_protocol=['compatible', 'eager'])
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 1a7f6ab..4c2a91a 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -91,8 +91,10 @@ class ConnectStandaloneFileTest(Test):
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                   topics=self.topics, controller_num_nodes_override=self.num_zk)
 
-        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
+                                               include_filestream_connectors=True)
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
+                                             include_filestream_connectors=True)
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST,
                                                   consumer_timeout_ms=10000)
 
@@ -164,8 +166,10 @@ class ConnectStandaloneFileTest(Test):
         else:
             faulty_records = faulty_records[0]
 
-        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
+                                               include_filestream_connectors=True)
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
+                                             include_filestream_connectors=True)
 
         self.zk.start()
         self.kafka.start()