You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2024/02/06 11:37:33 UTC
(camel-kafka-connector) 01/03: Updated to kafka 3.5.1
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch camel-kafka-connector-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 2d189cfe567de8687c0229cc70279c1bb3819773
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Tue Feb 6 09:57:58 2024 +0100
Updated to kafka 3.5.1
---
parent/pom.xml | 2 +-
.../kafkaconnector/common/services/kafka/EmbeddedKafkaService.java | 3 ++-
.../services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java | 3 ++-
.../common/services/kafkaconnect/KafkaConnectRunner.java | 7 ++++---
4 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/parent/pom.xml b/parent/pom.xml
index 8adb09ff1..e8c51b4b1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -27,7 +27,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <kafka.version>3.4.1</kafka.version>
+ <kafka.version>3.5.1</kafka.version>
<camel.version>4.0.0</camel.version>
<camel.kamelet.catalog.version>4.0.0</camel.kamelet.catalog.version>
<apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
index e10b7be17..63b6892fe 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
@@ -26,6 +26,7 @@ import org.apache.camel.kafkaconnector.common.PluginPathHelper;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
import org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class EmbeddedKafkaService implements KafkaService {
String address = "http://localhost:" + NetworkUtils.getFreePort();
LOG.info("Using the following address for the listener configuration: {}", address);
- workerProps.put(WorkerConfig.LISTENERS_CONFIG, address);
+ workerProps.put(RestServerConfig.LISTENERS_CONFIG, address);
String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
index 98a69cdfa..764b3ccfb 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
@@ -21,6 +21,7 @@ import java.util.Properties;
import org.apache.camel.kafkaconnector.common.PluginPathHelper;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +57,7 @@ class DefaultKafkaConnectPropertyFactory implements KafkaConnectPropertyFactory
String address = NetworkUtils.getAddress("http");
LOG.info("Using the following address for the listener configuration: {}", address);
- props.put(StandaloneConfig.LISTENERS_CONFIG, address);
+ props.put(RestServerConfig.LISTENERS_CONFIG, address);
String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
props.put(StandaloneConfig.PLUGIN_PATH_CONFIG, pluginPaths);
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
index ff1c26785..4e76fec8e 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
@@ -37,13 +37,14 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,14 +132,14 @@ class KafkaConnectRunner {
AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
RestClient restClient = new RestClient(config);
- RestServer rest = new RestServer(config, restClient);
+ ConnectRestServer rest = new ConnectRestServer(10, restClient, standAloneProperties);
rest.initializeServer();
/*
According to the Kafka source code "... Worker runs a (dynamic) set of tasks
in a set of threads, doing the work of actually moving data to/from Kafka ..."
*/
- Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
+ Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(new StringConverter()), allConnectorClientConfigOverridePolicy);
/*
From Kafka source code: " ... The herder interface tracks and manages workers