You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Ye Chen <ch...@163.com> on 2021/02/20 07:26:27 UTC
自定义partition,使用遇到问题,附代码
各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
//自定义partition
public class customPartitioner extends FlinkKafkaPartitioner<String> {
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return 0;
}
}
DataStream<String> stream = 。。。
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"test_topic",
new SimpleStringSchema(),
properties,
new customPartitioner()
);
stream.addSink(myProducer);
//上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
//去掉new customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
this(
topicId,
serializationSchema,
producerConfig,
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
Re: 自定义partition,使用遇到问题,附代码
Posted by 冯嘉伟 <14...@qq.com>.
Hi!
Optional.of(new customPartitioner())
Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> <String>
> {
> @Override
> public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
> return 0;
> }
> }
>
>
> DataStream
> <String>
> stream = 。。。
> FlinkKafkaProducer
> <String>
> myProducer = new FlinkKafkaProducer<>(
> "test_topic",
> new SimpleStringSchema(),
> properties,
> new customPartitioner()
> );
> stream.addSink(myProducer);
>
>
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
>
>
>
>
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> <IN>
> serializationSchema,
> Properties producerConfig,
> Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }
--
Sent from: http://apache-flink.147419.n8.nabble.com/