You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/11 03:25:14 UTC
[pulsar] branch master updated: [pulsar-io] Fix invalid topic name
generation in kafka-source-connector (#9035)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1c3a743 [pulsar-io] Fix invalid topic name generation in kafka-source-connector (#9035)
1c3a743 is described below
commit 1c3a743749924c7c2b7d865565d1fb3c548511c5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jan 10 19:24:36 2021 -0800
[pulsar-io] Fix invalid topic name generation in kafka-source-connector (#9035)
### Motivation
Right now, kafka-source-connector creates invalid topic name which causes error while creating producer in debezium io-source.
```
21:22:41,772 DEBUG [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] JavaInstance - Got result: object: (key = "[B@3bbd472c", value = "[B@1aa204ba")
21:22:41,780 ERROR [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] PulsarSink - Failed to create Producer while doing user publish
org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892) ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) [?:?]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?]
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) [?:?]
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) [?:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at java.lang.Thread.run(Thread.java:834) [?:?]
21:22:41,781 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write:
java.lang.RuntimeException: org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:124) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[?:?]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?]
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) [?:?]
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) [?:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892) ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
... 13 more
21:22:41,790 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
```
---
.../org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 38f6a70..5a6a1a6 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -189,7 +189,7 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
KafkaSchemaWrappedSchema valueSchema;
AbstractKafkaSourceRecord(SourceRecord srcRecord) {
- this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic());
+ this.destinationTopic = Optional.of("persistent://"+topicNamespace + "/" + srcRecord.topic());
}
@Override