You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kla <la...@gmail.com> on 2017/10/24 07:24:22 UTC

Use a round-robin kafka partitioner

Hey, 

I would like to use a round-robin kafka partitioner in the apache flink. 
(the default one)

I forked the Kafka's code from the DefaultPartitioner class.

public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {

    private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());

    @Override
    public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {

        if (serializedKey == null) {
            int nextValue = counter.getAndIncrement();
            // key is null choose randomly
            return toPositive(nextValue) % numPartitions;
        } else {
            // hash the keyBytes to choose a partition
            return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
        }
    }

    private static int toPositive(int number) {
        return number & 0x7fffffff;

    }
}

Is it a better way to do it ?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use a round-robin kafka partitioner

Posted by kla <la...@gmail.com>.
Thanks for your comment.
If I write the KafkaPartitioner anyway I have to somehow pass the
*kafka.producer.Partitioner* which is not so easy.

So I have found the easiest solution for this, you have just pass /null/
value:

outputStream.addSink(new
FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC),
                                                         new
EventSerializationSchema(),
                                                         producerProperties,
null));

Which means that *FlinkKafkaProducer* will automatically use the Kafka's
default partitioner.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use a round-robin kafka partitioner

Posted by Chesnay Schepler <ch...@apache.org>.
So you want to use the kafka partitioner directly?

How about an adapter?

public class KafkaPartitionerWrapper<T> extends KafkaPartitioner<T> implements Serializable {
    private final kafka.producer.Partitionerpartitioner; public KafkaPartitionerWrapper(kafka.producer.Partitioner partitioner) {
       this.partitioner = partitioner; }

    @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
       // maybe pass Arrays.hashCode(key) instead
       return partitioner.partition(key, partitions.length); }
}

On 25.10.2017 09:58, kla wrote:
> Exactly, I did like this, the only thing is that I am using 1.2.0 version of
> Flink and in this version the class name is KafkaPartitioner.
>
> But the problem is that I would not like to "fork" the Kafka's source code.
> (Please check my first comment)
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Use a round-robin kafka partitioner

Posted by kla <la...@gmail.com>.
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.

But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use a round-robin kafka partitioner

Posted by Chesnay Schepler <ch...@apache.org>.
Hi!

you will have to modify your partitioner to implement the 
FlinkKafkaPartitioner 
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java>interface 
instead.
You can then plug this into any kafka sink through on of the constructors.

Regards,
Chesnay

On 24.10.2017 22:15, kla wrote:
> Hi Chesnay,
>
> Thanks for your reply.
>
> I would like to use the partitioner within the Kafka Sink operation.
>
> By default kafka sink is using FixedPartitioner:
>
> 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
> serializationSchema, Properties producerConfig) {
> 		this(topicId, serializationSchema, producerConfig, new
> FixedPartitioner<T>());
> 	}
>
> So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
> have unbalanced partitioning.
> According to the java doc in the FixedPartitioner class which is following:
>
>   *  Not all Kafka partitions contain data
>   *  To avoid such an unbalanced partitioning, use a round-robin kafka
> partitioner. (note that this will
>   *  cause a lot of network connections between all the Flink instances and
> all the Kafka brokers
>
> According to the this I have to use a round-robin kafka partitioner. And
> what is the right way to do it ?
>
> Thanks again.
>
>
>
> --
> Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Use a round-robin kafka partitioner

Posted by kla <la...@gmail.com>.
Hi Chesnay,

Thanks for your reply.

I would like to use the partitioner within the Kafka Sink operation.

By default kafka sink is using FixedPartitioner:

	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
serializationSchema, Properties producerConfig) {
		this(topicId, serializationSchema, producerConfig, new
FixedPartitioner<T>());
	}

So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:

 *  Not all Kafka partitions contain data
 *  To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
 *  cause a lot of network connections between all the Flink instances and
all the Kafka brokers

According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?

Thanks again.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use a round-robin kafka partitioner

Posted by Chesnay Schepler <ch...@apache.org>.
Could you expand a bit more on what you want to achieve?
(In particular /where/ you want to use this partitioner; as an operation 
before a sink
or within a kafka sink)

On 24.10.2017 09:24, kla wrote:
> Hey,
>
> I would like to use a round-robin kafka partitioner in the apache flink.
> (the default one)
>
> I forked the Kafka's code from the DefaultPartitioner class.
>
> public class HashPartitioner<T> extends KafkaPartitioner<T> implements
> Serializable {
>
>      private final AtomicInteger counter = new AtomicInteger(new
> Random().nextInt());
>
>      @Override
>      public int partition(T next, byte[] serializedKey, byte[]
> serializedValue, int numPartitions) {
>
>          if (serializedKey == null) {
>              int nextValue = counter.getAndIncrement();
>              // key is null choose randomly
>              return toPositive(nextValue) % numPartitions;
>          } else {
>              // hash the keyBytes to choose a partition
>              return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
>          }
>      }
>
>      private static int toPositive(int number) {
>          return number & 0x7fffffff;
>
>      }
> }
>
> Is it a better way to do it ?
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>