You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:40:54 UTC
[pulsar] 01/14: [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eb779ff8ccbe3eae80148f29715aa5713e330d91
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Mar 2 16:35:58 2022 +0800
[pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)
(cherry picked from commit e6656e1407be80fdf4b6aaf424a57068687840cc)
---
.../io/kafka/connect/PulsarOffsetBackingStore.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 495c8b9..86905ad 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -30,6 +30,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -49,7 +51,7 @@ import org.apache.pulsar.client.api.Schema;
@Slf4j
public class PulsarOffsetBackingStore implements OffsetBackingStore {
- private Map<ByteBuffer, ByteBuffer> data;
+ private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
private PulsarClient client;
private String topic;
private Producer<byte[]> producer;
@@ -65,7 +67,6 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
public void configure(WorkerConfig workerConfig) {
this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
checkArgument(!isBlank(topic), "Offset storage topic must be specified");
- this.data = new HashMap<>();
log.info("Configure offset backing store on pulsar topic {}", topic);
}
@@ -126,10 +127,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
}
void processMessage(Message<byte[]> message) {
- synchronized (data) {
+ if (message.getKey() != null) {
data.put(
ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
ByteBuffer.wrap(message.getValue()));
+ } else {
+ log.debug("Got message without key from the offset storage topic, skip it. message value: {}",
+ message.getValue());
}
}
@@ -149,10 +153,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
CompletableFuture<Void> endFuture = new CompletableFuture<>();
readToEnd(endFuture);
- endFuture.join();
+ endFuture.get();
} catch (PulsarClientException e) {
log.error("Failed to setup pulsar producer/reader to cluster", e);
throw new RuntimeException("Failed to setup pulsar producer/reader to cluster ", e);
+ } catch (ExecutionException | InterruptedException e) {
+ log.error("Failed to start PulsarOffsetBackingStore", e);
+ throw new RuntimeException("Failed to start PulsarOffsetBackingStore", e);
}
}
@@ -180,6 +187,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
}
reader = null;
}
+ data.clear();
// do not close the client, it is provided by the sink context
}
@@ -191,10 +199,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
return endFuture.thenApply(ignored -> {
Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
for (ByteBuffer key : keys) {
- ByteBuffer value;
- synchronized (data) {
- value = data.get(key);
- }
+ ByteBuffer value = data.get(key);
if (null != value) {
values.put(key, value);
}