You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:48:59 UTC

[pulsar] 09/14: Handle kafka sinks that return immutable maps as configs (#14780)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ae8faed33e9dd3c46ad36f3e7872a6b8a2a62b3
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Mar 22 00:51:39 2022 -0700

    Handle kafka sinks that return immutable maps as configs (#14780)
    
    (cherry picked from commit b56d7318e73fb6915208dbe1223446e759c2ed0b)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 28 +++++++++++++---------
 .../connect/SchemaedFileStreamSinkConnector.java   | 14 +++++++++++
 2 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 268105c..e8165f8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -24,6 +24,18 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -44,17 +56,6 @@ import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
 
 @Slf4j
@@ -154,6 +155,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         Preconditions.checkNotNull(configs);
         Preconditions.checkArgument(configs.size() == 1);
 
+        // configs may contain immutable/unmodifiable maps
+        configs = configs.stream()
+                .map(HashMap::new)
+                .collect(Collectors.toList());
+
         configs.forEach(x -> {
             x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
         });
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
index a3cce92..4a78661 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
@@ -22,6 +22,11 @@ package org.apache.pulsar.io.kafka.connect;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.file.FileStreamSinkConnector;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * A FileStreamSinkConnector for testing that writes data other than just a value, i.e.:
  * key, value, key and value schemas.
@@ -31,4 +36,13 @@ public class SchemaedFileStreamSinkConnector extends FileStreamSinkConnector {
     public Class<? extends Task> taskClass() {
         return SchemaedFileStreamSinkTask.class;
     }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        // to test cases when task return immutable maps as configs
+        return super.taskConfigs(maxTasks)
+                .stream()
+                .map(Collections::unmodifiableMap)
+                .collect(Collectors.toList());
+    }
 }