You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Ye Chen <ch...@163.com> on 2021/02/20 07:26:27 UTC

自定义partition,使用遇到问题,附代码

各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
//自定义partition
public class customPartitioner extends FlinkKafkaPartitioner<String> {
    @Override
    public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        return 0;
    }
}


DataStream<String> stream = 。。。
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
                "test_topic",
                new SimpleStringSchema(),
                properties,
                new customPartitioner()
        );
stream.addSink(myProducer);


//上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
//去掉new customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数




查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
this(
topicId,
serializationSchema,
producerConfig,
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}




Re: 自定义partition,使用遇到问题,附代码

Posted by 冯嘉伟 <14...@qq.com>.
Hi!

    Optional.of(new customPartitioner())



Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> <String>
>  {
>     @Override
>     public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
>         return 0;
>     }
> }
> 
> 
> DataStream
> <String>
>  stream = 。。。
> FlinkKafkaProducer
> <String>
>  myProducer = new FlinkKafkaProducer<>(
>                 "test_topic",
>                 new SimpleStringSchema(),
>                 properties,
>                 new customPartitioner()
>         );
> stream.addSink(myProducer);
> 
> 
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
> 
> 
> 
> 
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> <IN>
>  serializationSchema,
> Properties producerConfig,
> Optional&lt;FlinkKafkaPartitioner&lt;IN&gt;> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }





--
Sent from: http://apache-flink.147419.n8.nabble.com/