You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/11 03:25:14 UTC

[pulsar] branch master updated: [pulsar-io] Fix invalid topic name generation in kafka-source-connector (#9035)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c3a743  [pulsar-io] Fix invalid topic name generation in kafka-source-connector (#9035)
1c3a743 is described below

commit 1c3a743749924c7c2b7d865565d1fb3c548511c5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jan 10 19:24:36 2021 -0800

    [pulsar-io] Fix invalid topic name generation in kafka-source-connector (#9035)
    
    ### Motivation
    
    Right now, kafka-source-connector creates invalid topic name which causes error while creating producer in debezium io-source.
    
    ```
    21:22:41,772 DEBUG [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] JavaInstance - Got result: object: (key = "[B@3bbd472c", value = "[B@1aa204ba")
    21:22:41,780 ERROR [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] PulsarSink - Failed to create Producer while doing user publish
    org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
    	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892) ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) [?:?]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?]
    	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) [?:?]
    	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) [?:?]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at java.lang.Thread.run(Thread.java:834) [?:?]
    21:22:41,781 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write:
    java.lang.RuntimeException: org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:124) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[?:?]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?]
    	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) [?:?]
    	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) [?:?]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at java.lang.Thread.run(Thread.java:834) [?:?]
    Caused by: org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
    	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892) ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
    	... 13 more
    21:22:41,790 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values:
    	schemas.cache.config = 1000
    	enhanced.avro.schema.support = false
    	connect.meta.data = true
    ```
---
 .../org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 38f6a70..5a6a1a6 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -189,7 +189,7 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
         KafkaSchemaWrappedSchema valueSchema;
 
         AbstractKafkaSourceRecord(SourceRecord srcRecord) {
-            this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic());
+            this.destinationTopic = Optional.of("persistent://"+topicNamespace + "/" + srcRecord.topic());
         }
 
         @Override