You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Echo Lee (Jira)" <ji...@apache.org> on 2022/07/17 04:01:00 UTC

[jira] [Commented] (FLINK-28576) Kafka's two source api performance differences

    [ https://issues.apache.org/jira/browse/FLINK-28576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567563#comment-17567563 ] 

Echo Lee commented on FLINK-28576:
----------------------------------

CC [~martijnvisser] Can you help explain this difference? thanks.

> Kafka's two source api performance differences
> ----------------------------------------------
>
>                 Key: FLINK-28576
>                 URL: https://issues.apache.org/jira/browse/FLINK-28576
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Echo Lee
>            Priority: Major
>
> I recently found out that the new kafka source api is 10 times more performant than the old one, but don't know what's causing it.
>  
> {code:java}
> // new source api
> KafkaSource<String> source = KafkaSource.<String>builder()
>                 .setBootstrapServers(brokers)
>                 .setTopics("benchmark")
>                 .setGroupId("my-group")
>                 .setStartingOffsets(OffsetsInitializer.earliest())
>                 .setBounded(OffsetsInitializer.latest())
>                 .setValueOnlyDeserializer(new SimpleStringSchema())
>                 .build(); 
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
>         .map(new MapFunction<String, Object>() {
>             private int count = 0;
>             private long lastTime = System.currentTimeMillis();
>                      
>                   @Override
>             public Object map(String value) throws Exception {
>                 count++;
>                 if (count % 100000 == 0) {
>                     long currentTime = System.currentTimeMillis();
>                     System.out.println(currentTime - lastTime);                                   lastTime = currentTime;              
>                       }
>                 return null;
>             }
>         });{code}
>  
>  
>  
> {code:java}
> // old source api
> FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark",
>                 new SimpleStringSchema(), properties);
> flinkKafkaConsumer.setStartFromEarliest(); 
> env.addSource(flinkKafkaConsumer)
>         .map(new MapFunction<String, Object>() {
>             private int count = 0;            
>                   private long lastTime = System.currentTimeMillis();
>           
>                   @Override
>             public Object map(String value) throws Exception {
>                 count++;
>                 if (count % 100000 == 0) {
>                     long currentTime = System.currentTimeMillis();
>                     System.out.println(currentTime - lastTime);                                    lastTime = currentTime;              
>                        }
>                 return null;
>             }
>         });{code}
>  
> Two ways to use the same data of the same topic.
> Flink version: 1.14.x
> Single data size: 1k
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)