You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kla <la...@gmail.com> on 2017/10/24 07:24:22 UTC
Use a round-robin kafka partitioner
Hey,
I would like to use a round-robin kafka partitioner in the apache flink.
(the default one)
I forked the Kafka's code from the DefaultPartitioner class.
public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {
private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());
@Override
public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {
if (serializedKey == null) {
int nextValue = counter.getAndIncrement();
// key is null choose randomly
return toPositive(nextValue) % numPartitions;
} else {
// hash the keyBytes to choose a partition
return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
}
private static int toPositive(int number) {
return number & 0x7fffffff;
}
}
Is it a better way to do it ?
Thanks,
Konstantin
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Use a round-robin kafka partitioner
Posted by kla <la...@gmail.com>.
Thanks for your comment.
If I write the KafkaPartitioner anyway I have to somehow pass the
*kafka.producer.Partitioner* which is not so easy.
So I have found the easiest solution for this, you have just pass /null/
value:
outputStream.addSink(new
FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC),
new
EventSerializationSchema(),
producerProperties,
null));
Which means that *FlinkKafkaProducer* will automatically use the Kafka's
default partitioner.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Use a round-robin kafka partitioner
Posted by Chesnay Schepler <ch...@apache.org>.
So you want to use the kafka partitioner directly?
How about an adapter?
public class KafkaPartitionerWrapper<T> extends KafkaPartitioner<T> implements Serializable {
private final kafka.producer.Partitionerpartitioner; public KafkaPartitionerWrapper(kafka.producer.Partitioner partitioner) {
this.partitioner = partitioner; }
@Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// maybe pass Arrays.hashCode(key) instead
return partitioner.partition(key, partitions.length); }
}
On 25.10.2017 09:58, kla wrote:
> Exactly, I did like this, the only thing is that I am using 1.2.0 version of
> Flink and in this version the class name is KafkaPartitioner.
>
> But the problem is that I would not like to "fork" the Kafka's source code.
> (Please check my first comment)
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
Re: Use a round-robin kafka partitioner
Posted by kla <la...@gmail.com>.
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.
But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)
Thanks,
Konstantin
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Use a round-robin kafka partitioner
Posted by Chesnay Schepler <ch...@apache.org>.
Hi!
you will have to modify your partitioner to implement the
FlinkKafkaPartitioner
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java>interface
instead.
You can then plug this into any kafka sink through on of the constructors.
Regards,
Chesnay
On 24.10.2017 22:15, kla wrote:
> Hi Chesnay,
>
> Thanks for your reply.
>
> I would like to use the partitioner within the Kafka Sink operation.
>
> By default kafka sink is using FixedPartitioner:
>
> public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
> serializationSchema, Properties producerConfig) {
> this(topicId, serializationSchema, producerConfig, new
> FixedPartitioner<T>());
> }
>
> So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
> have unbalanced partitioning.
> According to the java doc in the FixedPartitioner class which is following:
>
> * Not all Kafka partitions contain data
> * To avoid such an unbalanced partitioning, use a round-robin kafka
> partitioner. (note that this will
> * cause a lot of network connections between all the Flink instances and
> all the Kafka brokers
>
> According to the this I have to use a round-robin kafka partitioner. And
> what is the right way to do it ?
>
> Thanks again.
>
>
>
> --
> Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
Re: Use a round-robin kafka partitioner
Posted by kla <la...@gmail.com>.
Hi Chesnay,
Thanks for your reply.
I would like to use the partitioner within the Kafka Sink operation.
By default kafka sink is using FixedPartitioner:
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new
FixedPartitioner<T>());
}
So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:
* Not all Kafka partitions contain data
* To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
* cause a lot of network connections between all the Flink instances and
all the Kafka brokers
According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?
Thanks again.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Use a round-robin kafka partitioner
Posted by Chesnay Schepler <ch...@apache.org>.
Could you expand a bit more on what you want to achieve?
(In particular /where/ you want to use this partitioner; as an operation
before a sink
or within a kafka sink)
On 24.10.2017 09:24, kla wrote:
> Hey,
>
> I would like to use a round-robin kafka partitioner in the apache flink.
> (the default one)
>
> I forked the Kafka's code from the DefaultPartitioner class.
>
> public class HashPartitioner<T> extends KafkaPartitioner<T> implements
> Serializable {
>
> private final AtomicInteger counter = new AtomicInteger(new
> Random().nextInt());
>
> @Override
> public int partition(T next, byte[] serializedKey, byte[]
> serializedValue, int numPartitions) {
>
> if (serializedKey == null) {
> int nextValue = counter.getAndIncrement();
> // key is null choose randomly
> return toPositive(nextValue) % numPartitions;
> } else {
> // hash the keyBytes to choose a partition
> return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
> }
> }
>
> private static int toPositive(int number) {
> return number & 0x7fffffff;
>
> }
> }
>
> Is it a better way to do it ?
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>