You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:48:59 UTC
[pulsar] 09/14: Handle kafka sinks that return immutable maps as configs (#14780)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9ae8faed33e9dd3c46ad36f3e7872a6b8a2a62b3
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Mar 22 00:51:39 2022 -0700
Handle kafka sinks that return immutable maps as configs (#14780)
(cherry picked from commit b56d7318e73fb6915208dbe1223446e759c2ed0b)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 28 +++++++++++++---------
.../connect/SchemaedFileStreamSinkConnector.java | 14 +++++++++++
2 files changed, 31 insertions(+), 11 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 268105c..e8165f8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -24,6 +24,18 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -44,17 +56,6 @@ import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
@Slf4j
@@ -154,6 +155,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
Preconditions.checkNotNull(configs);
Preconditions.checkArgument(configs.size() == 1);
+ // configs may contain immutable/unmodifiable maps
+ configs = configs.stream()
+ .map(HashMap::new)
+ .collect(Collectors.toList());
+
configs.forEach(x -> {
x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
});
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
index a3cce92..4a78661 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
@@ -22,6 +22,11 @@ package org.apache.pulsar.io.kafka.connect;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* A FileStreamSinkConnector for testing that writes data other than just a value, i.e.:
* key, value, key and value schemas.
@@ -31,4 +36,13 @@ public class SchemaedFileStreamSinkConnector extends FileStreamSinkConnector {
public Class<? extends Task> taskClass() {
return SchemaedFileStreamSinkTask.class;
}
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ // to test cases when task return immutable maps as configs
+ return super.taskConfigs(maxTasks)
+ .stream()
+ .map(Collections::unmodifiableMap)
+ .collect(Collectors.toList());
+ }
}