You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:40:54 UTC

[pulsar] 01/14: [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb779ff8ccbe3eae80148f29715aa5713e330d91
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Mar 2 16:35:58 2022 +0800

    [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)
    
    (cherry picked from commit e6656e1407be80fdf4b6aaf424a57068687840cc)
---
 .../io/kafka/connect/PulsarOffsetBackingStore.java  | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 495c8b9..86905ad 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -30,6 +30,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -49,7 +51,7 @@ import org.apache.pulsar.client.api.Schema;
 @Slf4j
 public class PulsarOffsetBackingStore implements OffsetBackingStore {
 
-    private Map<ByteBuffer, ByteBuffer> data;
+    private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
     private PulsarClient client;
     private String topic;
     private Producer<byte[]> producer;
@@ -65,7 +67,6 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     public void configure(WorkerConfig workerConfig) {
         this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         checkArgument(!isBlank(topic), "Offset storage topic must be specified");
-        this.data = new HashMap<>();
 
         log.info("Configure offset backing store on pulsar topic {}", topic);
     }
@@ -126,10 +127,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     }
 
     void processMessage(Message<byte[]> message) {
-        synchronized (data) {
+        if (message.getKey() != null) {
             data.put(
                 ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
                 ByteBuffer.wrap(message.getValue()));
+        } else {
+            log.debug("Got message without key from the offset storage topic, skip it. message value: {}",
+                    message.getValue());
         }
     }
 
@@ -149,10 +153,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
 
             CompletableFuture<Void> endFuture = new CompletableFuture<>();
             readToEnd(endFuture);
-            endFuture.join();
+            endFuture.get();
         } catch (PulsarClientException e) {
             log.error("Failed to setup pulsar producer/reader to cluster", e);
             throw new RuntimeException("Failed to setup pulsar producer/reader to cluster ",  e);
+        } catch (ExecutionException | InterruptedException e) {
+            log.error("Failed to start PulsarOffsetBackingStore", e);
+            throw new RuntimeException("Failed to start PulsarOffsetBackingStore",  e);
         }
     }
 
@@ -180,6 +187,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
             }
             reader = null;
         }
+        data.clear();
 
         // do not close the client, it is provided by the sink context
     }
@@ -191,10 +199,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
         return endFuture.thenApply(ignored -> {
             Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
             for (ByteBuffer key : keys) {
-                ByteBuffer value;
-                synchronized (data) {
-                    value = data.get(key);
-                }
+                ByteBuffer value = data.get(key);
                 if (null != value) {
                     values.put(key, value);
                 }