You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Echo Lee (Jira)" <ji...@apache.org> on 2022/07/17 04:01:00 UTC
[jira] [Commented] (FLINK-28576) Kafka's two source api performance differences
[ https://issues.apache.org/jira/browse/FLINK-28576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567563#comment-17567563 ]
Echo Lee commented on FLINK-28576:
----------------------------------
CC [~martijnvisser] Can you help explain this difference? thanks.
> Kafka's two source api performance differences
> ----------------------------------------------
>
> Key: FLINK-28576
> URL: https://issues.apache.org/jira/browse/FLINK-28576
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Echo Lee
> Priority: Major
>
> I recently found out that the new kafka source api is 10 times more performant than the old one, but don't know what's causing it.
>
> {code:java}
> // new source api
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers(brokers)
> .setTopics("benchmark")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setBounded(OffsetsInitializer.latest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
> .map(new MapFunction<String, Object>() {
> private int count = 0;
> private long lastTime = System.currentTimeMillis();
>
> @Override
> public Object map(String value) throws Exception {
> count++;
> if (count % 100000 == 0) {
> long currentTime = System.currentTimeMillis();
> System.out.println(currentTime - lastTime); lastTime = currentTime;
> }
> return null;
> }
> });{code}
>
>
>
> {code:java}
> // old source api
> FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark",
> new SimpleStringSchema(), properties);
> flinkKafkaConsumer.setStartFromEarliest();
> env.addSource(flinkKafkaConsumer)
> .map(new MapFunction<String, Object>() {
> private int count = 0;
> private long lastTime = System.currentTimeMillis();
>
> @Override
> public Object map(String value) throws Exception {
> count++;
> if (count % 100000 == 0) {
> long currentTime = System.currentTimeMillis();
> System.out.println(currentTime - lastTime); lastTime = currentTime;
> }
> return null;
> }
> });{code}
>
> Two ways to use the same data of the same topic.
> Flink version: 1.14.x
> Single data size: 1k
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)