You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by max <ma...@foxmail.com> on 2022/06/16 03:51:02 UTC
About FlinkkafkaConsumer msg delay Pyflink[1.15]
Hi:
we use pyflink[1.15],but find it have large delay,avg to 500ms,with same java code,it's delay in range 1-6 ms,it's have any idea to fix it?
Thanks
pyflink demo code:
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode
from pyflink.datastream.connectors import FlinkKafkaConsumer,FlinkKafkaProducer
from pyflink.datastream.functions import RuntimeContext, MapFunction
import time,json
def mymap(value):
now = time.time()
sv = json.loads(value)
num = float(sv)
print(now,"recv:",value,"span:",now - num)
return sv +"_"+str(now)
def demo1():
env = StreamExecutionEnvironment.get_execution_environment();
env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC);
env.set_parallelism(1);
# 启动消费者
deserialization_schema = SimpleStringSchema()
kafka_props = {
'bootstrap.servers': "127.0.0.1:9092",
'group.id': "test_group_1",
};
kafka_source = FlinkKafkaConsumer(
topics = "kafka_demo",
deserialization_schema = deserialization_schema,
properties = kafka_props,
);
ds = env.add_source(kafka_source).set_parallelism(1)
serialization_schema = SimpleStringSchema()
kafka_producer = FlinkKafkaProducer(
topic = "test_producer_topic",
serialization_schema = serialization_schema,
producer_config = kafka_props);
ds = ds.map(mymap, Types.STRING()).add_sink(kafka_producer);
env.execute("Test");
if __name__ == '__main__':
print("start flink_demo1")
demo1()
java code:
package com.lhhj;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
public class Test {
public static byte[] ProcessMsg(byte[] value) {
try {
long now = System.currentTimeMillis();
String sb = new String(value, "UTF-8");
double recvf = Double.parseDouble(sb)*1000;
long recv = (long)recvf;
System.out.println("recv msg " + recv + " now:" + now + " diff:" + (now - recv));
String ret = Long.toString(recv) + "_" + Long.toString(now);
return ret.getBytes();
} catch (Exception e) {
System.out.println("err msg:"+e.getMessage());
return value;
}
}
public static void main(String[] args) {
System.out.println("Hello World! FlinkDelayTest");
String broker = "127.0.0.1:9092";
KafkaSource<byte[]> source = KafkaSource.<byte[]>builder()
.setBootstrapServers(broker)
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("kafka_demo")
.setValueOnlyDeserializer(new CharSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<byte[]> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "cal_req");
KafkaSink<byte[]> sink = KafkaSink.<byte[]>builder()
.setBootstrapServers(broker)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new CharSchema())
.setTopicSelector((record) -> {
return "test_producer_topic";
})
.build())
.build();
stream.map(new MapFunction<byte[],byte[]>() {
@Override
public byte[] map(byte[] value){
return ProcessMsg(value);
}
}).filter(new FilterFunction<byte[]>() {
@Override
public boolean filter(byte[] value) throws Exception {
if (value.length > 0) {
return true;
}
return false;
}
}).sinkTo(sink);
try {
env.execute("FlinkDelayTest");
} catch (Exception e) {
e.printStackTrace();
}
}
}
max.zc@foxmail.com