You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2022/03/30 20:30:54 UTC

[kafka] branch 3.2 updated: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 42c8040  KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)
42c8040 is described below

commit 42c804005e196ee6218d570c4394614613fe1fb9
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Wed Mar 30 13:15:42 2022 -0700

    KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)
    
    With this change we stop including the non-production grade connectors that are meant to be used for demos and quick starts by default in the CLASSPATH and plugin.path of Connect deployments. The package of these connector will still be shipped with the Apache Kafka distribution and will be available for explicit inclusion.
    
    The changes have been tested through the system tests and the existing unit and integration tests.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 bin/kafka-run-class.sh                             |  4 +--
 docs/connect.html                                  |  1 +
 docs/quickstart.html                               | 19 ++++++++--
 tests/kafkatest/services/connect.py                | 42 +++++++++++++++++++---
 .../tests/connect/connect_distributed_test.py      | 12 ++++---
 tests/kafkatest/tests/connect/connect_rest_test.py |  3 +-
 tests/kafkatest/tests/connect/connect_test.py      | 12 ++++---
 7 files changed, 73 insertions(+), 20 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 6167583..490f930 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -32,7 +32,7 @@ if [ -z "$INCLUDE_TEST_JARS" ]; then
 fi
 
 # Exclude jars not necessary for running commands.
-regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
+regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$"
 should_include_file() {
   if [ "$INCLUDE_TEST_JARS" = true ]; then
     return 0
@@ -171,7 +171,7 @@ do
   CLASSPATH="$CLASSPATH:$dir/*"
 done
 
-for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
+for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
 do
   for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do
diff --git a/docs/connect.html b/docs/connect.html
index 66d6212..1251c3c 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -48,6 +48,7 @@
         <li><code>bootstrap.servers</code> - List of Kafka servers used to bootstrap connections to Kafka</li>
         <li><code>key.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
         <li><code>value.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
+        <li><code>plugin.path</code> (default <code>empty</code>) - a list of paths that contain Connect plugins (connectors, converters, transformations). Before running quick starts, users must add the absolute path that contains the example FileStreamSourceConnector and FileStreamSinkConnector packaged in <code>connect-file-"version".jar</code>, because these connectors are not included by default to the <code>CLASSPATH</code> or the <code>plugin.path</code> of the Connect worker (see [...]
     </ul>
 
     <p>The important configuration options specific to standalone mode are:</p>
diff --git a/docs/quickstart.html b/docs/quickstart.html
index 2ef56c8..70b1314 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -32,8 +32,8 @@
             the latest Kafka release and extract it:
         </p>
 
-        <pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_2.13-3.1.0.tgz
-$ cd kafka_2.13-3.1.0</code></pre>
+        <pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
     </div>
 
     <div class="quickstart-step">
@@ -173,7 +173,20 @@ This is my second event</code></pre>
         </p>
 
         <p>
-            First, we'll start by creating some seed data to test with:
+            First, make sure to add <code class="language-bash">connect-file-{{fullDotVersion}}.jar</code> to the <code>plugin.path</code> property in the Connect worker's configuration.
+            For the purpose of this quickstart we'll use a relative path and consider the connectors' package as an uber jar, which works when the quickstart commands are run from the installation directory.
+            However, it's worth noting that for production deployments using absolute paths is always preferable. See <a href="#connectconfigs_plugin.path">plugin.path</a> for a detailed description of how to set this config.
+        </p>
+
+        <p>
+            Edit the <code class="language-bash">config/connect-standalone.properties</code> file, add or change the <code>plugin.path</code> configuration property match the following, and save the file:
+        </p>
+
+        <pre class="brush: bash;">
+&gt; echo "plugin.path=lib/connect-file-{{fullDotVersion}}.jar"</pre>
+
+        <p>
+            Then, start by creating some seed data to test with:
         </p>
 
         <pre class="brush: bash;">
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 26c0d92..41c33cc 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -69,7 +69,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
+    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec=60,
+                 include_filestream_connectors=False):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
@@ -78,6 +79,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.startup_timeout_sec = startup_timeout_sec
         self.environment = {}
         self.external_config_template_func = None
+        self.include_filestream_connectors = include_filestream_connectors
+        self.logger.debug("include_filestream_connectors % s", include_filestream_connectors)
 
     def pids(self, node):
         """Return process ids for Kafka Connect processes."""
@@ -279,12 +282,34 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
         self.environment[envvar] = env_opts
 
+    def append_filestream_connectors_to_classpath(self):
+        if self.include_filestream_connectors:
+            cwd = os.getcwd()
+            self.logger.info("Including filestream connectors when starting Connect. "
+                             "Looking for jar locally in: %s" % cwd)
+            relative_path = "/connect/file/build/libs/"
+            local_dir = cwd + relative_path
+            lib_dir = self.path.home() + relative_path
+            for pwd, dirs, files in os.walk(local_dir):
+                for file in files:
+                    if file.startswith("connect-file") and file.endswith(".jar"):
+                        # Use the expected directory on the node instead of the path in the driver node
+                        file_path = lib_dir + file
+                        self.logger.debug("Appending %s to Connect worker's CLASSPATH" % file_path)
+                        return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
+            self.logger.info("Jar with filestream connectors was not found under %s" % lib_dir)
+        else:
+            self.logger.info("Starting Connect without filestream connectors in the CLASSPATH")
+
+        return None
+
 
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
 
-    def __init__(self, context, kafka, files, startup_timeout_sec = 60):
-        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
+    def __init__(self, context, kafka, files, startup_timeout_sec=60, include_filestream_connectors=False):
+        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec,
+                                                       include_filestream_connectors)
 
     # For convenience since this service only makes sense with a single node
     @property
@@ -299,6 +324,9 @@ class ConnectStandaloneService(ConnectServiceBase):
 
         cmd += fix_opts_for_new_jvm(node)
         cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
+        classpath = self.append_filestream_connectors_to_classpath()
+        cmd += classpath if classpath else ""
+
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
         cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
@@ -339,8 +367,9 @@ class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
-                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
-        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
+                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec=60,
+                 include_filestream_connectors=False):
+        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec, include_filestream_connectors)
         self.startup_mode = self.STARTUP_MODE_JOIN
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
@@ -355,6 +384,9 @@ class ConnectDistributedService(ConnectServiceBase):
         cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
+
+        classpath = self.append_filestream_connectors_to_classpath()
+        cmd += classpath if classpath else ""
         cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
         return cmd
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 6bc52b0..970779f 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -80,7 +80,7 @@ class ConnectDistributedTest(Test):
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False):
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                   topics=self.topics, version=broker_version,
@@ -89,7 +89,8 @@ class ConnectDistributedTest(Test):
             for node in self.kafka.nodes:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
 
-        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE],
+                                            include_filestream_connectors=include_filestream_connectors)
         self.cc.log_level = "DEBUG"
 
         self.zk.start()
@@ -370,7 +371,7 @@ class ConnectDistributedTest(Test):
         """
 
         self.CONNECT_PROTOCOL = connect_protocol
-        self.setup_services(security_protocol=security_protocol)
+        self.setup_services(security_protocol=security_protocol, include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
@@ -522,7 +523,7 @@ class ConnectDistributedTest(Test):
     @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
     def test_transformations(self, connect_protocol):
         self.CONNECT_PROTOCOL = connect_protocol
-        self.setup_services(timestamp_type='CreateTime')
+        self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
 
@@ -610,7 +611,8 @@ class ConnectDistributedTest(Test):
         or relies upon the broker to auto-create the topics (v0.10.0.x and before).
         """
         self.CONNECT_PROTOCOL = connect_protocol
-        self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics, security_protocol=security_protocol)
+        self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics,
+                            security_protocol=security_protocol, include_filestream_connectors=True)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 4d978a2..ff44d94 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -73,7 +73,8 @@ class ConnectRestApiTest(KafkaTest):
             'test': {'partitions': 1, 'replication-factor': 1}
         })
 
-        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
+        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE],
+                                            include_filestream_connectors=True)
 
     @cluster(num_nodes=4)
     @matrix(connect_protocol=['compatible', 'eager'])
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 1a7f6ab..4c2a91a 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -91,8 +91,10 @@ class ConnectStandaloneFileTest(Test):
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                   topics=self.topics, controller_num_nodes_override=self.num_zk)
 
-        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
+                                               include_filestream_connectors=True)
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
+                                             include_filestream_connectors=True)
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST,
                                                   consumer_timeout_ms=10000)
 
@@ -164,8 +166,10 @@ class ConnectStandaloneFileTest(Test):
         else:
             faulty_records = faulty_records[0]
 
-        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
+                                               include_filestream_connectors=True)
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
+                                             include_filestream_connectors=True)
 
         self.zk.start()
         self.kafka.start()