You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Echo Lee (Jira)" <ji...@apache.org> on 2022/07/17 03:55:00 UTC
[jira] [Created] (FLINK-28576) Kafka's two source api performance differences
Echo Lee created FLINK-28576:
--------------------------------
Summary: Kafka's two source api performance differences
Key: FLINK-28576
URL: https://issues.apache.org/jira/browse/FLINK-28576
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Reporter: Echo Lee
I recently found out that the new kafka source api is 10 times more performant than the old one, but don't know what's causing it.
{code:java}
// new source api
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("benchmark")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, Object>() {
private int count = 0;
private long lastTime = System.currentTimeMillis();
@Override
public Object map(String value) throws Exception {
count++;
if (count % 100000 == 0) {
long currentTime = System.currentTimeMillis();
System.out.println(currentTime - lastTime); lastTime = currentTime;
}
return null;
}
});{code}
{code:java}
// old source api
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark",
new SimpleStringSchema(), properties);
flinkKafkaConsumer.setStartFromEarliest();
env.addSource(flinkKafkaConsumer)
.map(new MapFunction<String, Object>() {
private int count = 0;
private long lastTime = System.currentTimeMillis();
@Override
public Object map(String value) throws Exception {
count++;
if (count % 100000 == 0) {
long currentTime = System.currentTimeMillis();
System.out.println(currentTime - lastTime); lastTime = currentTime;
}
return null;
}
});{code}
Two ways to use the same data of the same topic.
Flink version: 1.14.x
Single data size: 1k
--
This message was sent by Atlassian Jira
(v8.20.10#820010)