You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by max <ma...@foxmail.com> on 2022/06/16 03:51:02 UTC

About FlinkkafkaConsumer msg delay Pyflink[1.15]

Hi:
    we use pyflink[1.15],but find it have large delay,avg to 500ms,with same java code,it's delay in range 1-6 ms,it's have any idea to fix it?

Thanks


pyflink demo code:

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode
from pyflink.datastream.connectors import FlinkKafkaConsumer,FlinkKafkaProducer

from pyflink.datastream.functions import RuntimeContext, MapFunction
import time,json

def mymap(value):
    now = time.time()
    sv = json.loads(value)
    num = float(sv)
    print(now,"recv:",value,"span:",now - num)
    
    return sv +"_"+str(now)


def demo1():
    env = StreamExecutionEnvironment.get_execution_environment();
    env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC);
    env.set_parallelism(1);

    # 启动消费者
    deserialization_schema = SimpleStringSchema()
    kafka_props = {
        'bootstrap.servers': "127.0.0.1:9092",
        'group.id': "test_group_1",
    };
    kafka_source = FlinkKafkaConsumer(
        topics = "kafka_demo",
        deserialization_schema = deserialization_schema,
        properties = kafka_props,
        );
    ds = env.add_source(kafka_source).set_parallelism(1)

    serialization_schema = SimpleStringSchema()
    kafka_producer = FlinkKafkaProducer(
        topic = "test_producer_topic",
        serialization_schema = serialization_schema,
        producer_config = kafka_props);

    ds = ds.map(mymap, Types.STRING()).add_sink(kafka_producer);

    env.execute("Test");


if __name__ == '__main__':
    print("start flink_demo1")
    demo1()



java code:

package com.lhhj;

import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;

import java.util.Properties;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;



public class Test {

    public static byte[] ProcessMsg(byte[] value) {
        try {
            long now = System.currentTimeMillis();
            String sb = new String(value, "UTF-8");
            double recvf = Double.parseDouble(sb)*1000;
            long recv = (long)recvf;
            System.out.println("recv msg " + recv + "  now:" + now + "  diff:" + (now - recv));
            String ret = Long.toString(recv) + "_" + Long.toString(now);
            return ret.getBytes();
        } catch (Exception e) {
            System.out.println("err msg:"+e.getMessage());
            return value;
        }
    }

    public static void main(String[] args) {
        System.out.println("Hello World! FlinkDelayTest");
        String broker = "127.0.0.1:9092";

        KafkaSource<byte[]> source = KafkaSource.<byte[]>builder()
                .setBootstrapServers(broker)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setTopics("kafka_demo")
                .setValueOnlyDeserializer(new CharSchema())
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<byte[]> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "cal_req");

        KafkaSink<byte[]> sink = KafkaSink.<byte[]>builder()
                .setBootstrapServers(broker)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setValueSerializationSchema(new CharSchema())
                        .setTopicSelector((record) -> {
                            return "test_producer_topic";
                        })
                        .build())

                .build();


        stream.map(new MapFunction<byte[],byte[]>() {
            @Override
            public byte[] map(byte[] value){
                return ProcessMsg(value);
            }
        }).filter(new FilterFunction<byte[]>() {
            @Override
            public boolean filter(byte[] value) throws Exception {
                if (value.length > 0) {
                    return true;

                }
                return false;
            }
        }).sinkTo(sink);

        try {
            env.execute("FlinkDelayTest");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}




max.zc@foxmail.com