You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Echo Lee (Jira)" <ji...@apache.org> on 2022/07/17 03:55:00 UTC

[jira] [Created] (FLINK-28576) Kafka's two source api performance differences

Echo Lee created FLINK-28576:
--------------------------------

             Summary: Kafka's two source api performance differences
                 Key: FLINK-28576
                 URL: https://issues.apache.org/jira/browse/FLINK-28576
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
            Reporter: Echo Lee


I recently found out that the new kafka source api is 10 times more performant than the old one, but don't know what's causing it.

 
{code:java}
// new source api
KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("benchmark")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setBounded(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build(); 

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
        .map(new MapFunction<String, Object>() {
            private int count = 0;
            private long lastTime = System.currentTimeMillis();
                     
                  @Override
            public Object map(String value) throws Exception {
                count++;
                if (count % 100000 == 0) {
                    long currentTime = System.currentTimeMillis();
                    System.out.println(currentTime - lastTime);                                   lastTime = currentTime;              
                      }
                return null;
            }
        });{code}
 

 

 
{code:java}
// old source api
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark",
                new SimpleStringSchema(), properties);
flinkKafkaConsumer.setStartFromEarliest(); 
env.addSource(flinkKafkaConsumer)
        .map(new MapFunction<String, Object>() {
            private int count = 0;            
                  private long lastTime = System.currentTimeMillis();
          
                  @Override
            public Object map(String value) throws Exception {
                count++;
                if (count % 100000 == 0) {
                    long currentTime = System.currentTimeMillis();
                    System.out.println(currentTime - lastTime);                                    lastTime = currentTime;              
                       }
                return null;
            }
        });{code}
 

Two ways to use the same data of the same topic.

Flink version: 1.14.x

Single data size: 1k

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)