You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2024/02/06 11:37:33 UTC

(camel-kafka-connector) 01/03: Updated to kafka 3.5.1

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 2d189cfe567de8687c0229cc70279c1bb3819773
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Tue Feb 6 09:57:58 2024 +0100

    Updated to kafka 3.5.1
---
 parent/pom.xml                                                     | 2 +-
 .../kafkaconnector/common/services/kafka/EmbeddedKafkaService.java | 3 ++-
 .../services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java  | 3 ++-
 .../common/services/kafkaconnect/KafkaConnectRunner.java           | 7 ++++---
 4 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/parent/pom.xml b/parent/pom.xml
index 8adb09ff1..e8c51b4b1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -27,7 +27,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
-        <kafka.version>3.4.1</kafka.version>
+        <kafka.version>3.5.1</kafka.version>
         <camel.version>4.0.0</camel.version>
         <camel.kamelet.catalog.version>4.0.0</camel.kamelet.catalog.version>
         <apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
index e10b7be17..63b6892fe 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
@@ -26,6 +26,7 @@ import org.apache.camel.kafkaconnector.common.PluginPathHelper;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class EmbeddedKafkaService implements KafkaService {
 
         String address = "http://localhost:" + NetworkUtils.getFreePort();
         LOG.info("Using the following address for  the listener configuration: {}", address);
-        workerProps.put(WorkerConfig.LISTENERS_CONFIG, address);
+        workerProps.put(RestServerConfig.LISTENERS_CONFIG, address);
 
         String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
 
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
index 98a69cdfa..764b3ccfb 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
@@ -21,6 +21,7 @@ import java.util.Properties;
 
 import org.apache.camel.kafkaconnector.common.PluginPathHelper;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +57,7 @@ class DefaultKafkaConnectPropertyFactory implements KafkaConnectPropertyFactory
 
         String address = NetworkUtils.getAddress("http");
         LOG.info("Using the following address for  the listener configuration: {}", address);
-        props.put(StandaloneConfig.LISTENERS_CONFIG, address);
+        props.put(RestServerConfig.LISTENERS_CONFIG, address);
 
         String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
         props.put(StandaloneConfig.PLUGIN_PATH_CONFIG, pluginPaths);
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
index ff1c26785..4e76fec8e 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
@@ -37,13 +37,14 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
 import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,14 +132,14 @@ class KafkaConnectRunner {
         AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
 
         RestClient restClient = new RestClient(config);
-        RestServer rest = new RestServer(config, restClient);
+        ConnectRestServer rest = new ConnectRestServer(10, restClient, standAloneProperties);
         rest.initializeServer();
 
         /*
          According to the Kafka source code "... Worker runs a (dynamic) set of tasks
          in a set of threads, doing the work of actually moving data to/from Kafka ..."
          */
-        Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
+        Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(new StringConverter()), allConnectorClientConfigOverridePolicy);
 
         /*
         From Kafka source code: " ... The herder interface tracks and manages workers