You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "goldenyang (Jira)" <ji...@apache.org> on 2022/03/15 04:07:00 UTC

[jira] [Created] (FLINK-26642) Pulsar sink fails with non-partitioned topic

goldenyang created FLINK-26642:
----------------------------------

             Summary: Pulsar sink fails with non-partitioned topic
                 Key: FLINK-26642
                 URL: https://issues.apache.org/jira/browse/FLINK-26642
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Pulsar
    Affects Versions: 1.15.0
            Reporter: goldenyang


Flink support pulsar sink now in [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I encountered a problem when using pulsar sink in master branch, when I use non-partitioned topic.

The current test found that both partitioned topics and non-partitioned topics ending with -partition-i can be supported, but ordinary non-partitioned topics without -partition-i will have problems, such as 'test_topic'. 

Reproducing the problem requires writing to a non-partitioned topic. Below is the stack information when the exception is encountered. I briefly communicated with [~Jianyun Zhao] , this may be a bug. 

 
{code:java}
2022-03-08 21:39:13,622 - INFO - [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash.at org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:748) {code}
 
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)