You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Haoming Zhang <ha...@outlook.com> on 2014/11/26 02:42:45 UTC

Partition key not working properly



Hi all,

I'm struggling with how to use the partition key mechanism properly. My logic is set the partition number as 3, then  create three partition keys as "0", "1", "2", then use the partition keys to create three KeyedMessage such as 
KeyedMessage(topic, "0", message),
KeyedMessage(topic, "1", message), 
KeyedMessage(topic, "2", message)

After this, creating a producer instance to send out all the KeyedMessage.

I expecting each KeyedMessage should enter to different partitions according to the different partition keys, which means 
KeyedMessage(topic, "0", message) go to Partition 0,
KeyedMessage(topic, "1", message) go to Partition 1,
KeyedMessage(topic, "2", message) go to Partition 2

I'm using Kafka-web-console to watch the topic status, but the result is not like what I'm expecting. KeyedMessage still go to partitions randomly, some times two KeyedMessage will enter the same partition even they have different partition keys, .

Not sure whether my logic is incorrect or I didn't understand the partition key mechanism correctly. Anyone could provides some sample code or explanation would be great!

Thanks,
Haoming

 		 	   		  

Re: Partition key not working properly

Posted by Jun Rao <ju...@gmail.com>.
The issue is probably the hashcode of byte[] is based on reference, instead
of the value. You can try binding the producer to [Array[byte],Array[byte]]
and using ByteArrayPartitioner. Alternatively, you can bind the producer to
[String,Array[byte]] and the default partitioner should work.

Thanks,

Jun

On Wed, Nov 26, 2014 at 12:54 PM, Haoming Zhang <ha...@outlook.com>
wrote:

> Hi François,
>
> I agree with you and Svante, so I think my logic is correct. But I really
> can't find why my problem happens and I was stuck here for weeks. I think
> posting some of my codes might be helpful, could you please give the codes
> a quick check?
>
> Here is the producer codes, I didn't use the custom partitioner.class :
>   val props = new Properties()
>
>   val codec = if(compress) DefaultCompressionCodec.codec else
> NoCompressionCodec.codec
>
>   props.put("compression.codec", codec.toString)
>   props.put("producer.type", if(synchronously) "sync" else "async")
>   props.put("metadata.broker.list", brokerList)
>   props.put("batch.num.messages", batchSize.toString)
>   props.put("message.send.max.retries", messageSendMaxRetries.toString)
>   props.put("request.required.acks",requestRequiredAcks.toString)
>   props.put("client.id",clientId.toString)
>
>   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
>
>   def kafkaMesssage(message: Array[Byte], partition: Array[Byte]):
> KeyedMessage[AnyRef, AnyRef] = {
>      if (partition == null) {
>        new KeyedMessage(topic,message)
>      } else {
>        new KeyedMessage(topic,partition,message)
>      }
>   }
>
>   def send(message: String, partition: String = null): Unit =
> send(message.getBytes("UTF8"), if (partition == null) null else
> partition.getBytes("UTF8"))
>
>   def send(message: Array[Byte], partition: Array[Byte]): Unit = {
>     try {
>       producer.send(kafkaMesssage(message, partition))
>     } catch {
>       case e: Exception =>
>         e.printStackTrace
>         System.exit(1)
>     }
>   }
>
> And here is how I use the producer, create a producer instance, then use
> this instance to send three message. Currently I create the partition key
> as Integer, then convert it to Byte Arrays:
>   val testMessage = UUID.randomUUID().toString
>   val testTopic = "sample1"
>   val groupId_1 = "testGroup"
>
>   var testStatus = false
>
>   print("starting sample broker testing")
>   val producer = new KafkaProducer(testTopic, "localhost:9092")
>
>   val numList = List(0,1,2);
>   for (a <- numList) {
>     var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() // Create
> a partition key as Byte Array
>     producer.send(testMessage.getBytes("UTF8"), key)
>   }
>
> I appreciate all the suggestions and help!
>
> Thanks,
> Haoming
>
> > From: f.langelier@gmail.com
> > Date: Wed, 26 Nov 2014 19:57:13 +0000
> > Subject: Re: Partition key not working properly
> > To: users@kafka.apache.org
> >
> > Hi haoming,
> >
> > As far as I know, svante is right.
> >
> > Maybe you modified your default partitioner?
> >
> > or are you sure the same key go to different partitions? maybe its just 2
> > keys that are going to the same partition?
> >
> > Because it's possible that you have something like that
> > - key "1" -> partition 3
> > - key "2" -> partition 2
> > - key "3" -> partition 3
> >
> > and nothing in partition 1
> >
> > On Wed Nov 26 2014 at 02:35:33 Haoming Zhang <ha...@outlook.com>
> > wrote:
> >
> > > Hi Svante,
> > >
> > > Thanks for your reply!
> > >
> > > As you said, my purpose is let "all messages with the same key goes to
> the
> > > same partition", but the actual case is even I hard code the same
> partition
> > > key(let's say the key is "1") for three messages, the messages are
> still
> > > goes to different partitions.
> > >
> > > Regards,
> > > Haoming
> > >
> > > > Date: Wed, 26 Nov 2014 08:03:04 +0100
> > > > Subject: Re: Partition key not working properly
> > > > From: saka@csi.se
> > > > To: users@kafka.apache.org
> > > >
> > > > By default, the partition key is used for hashing then it's placed
> in a
> > > > partition that has the appropriate hashed keyspace.
> > > >
> > > > If you have three physical partitions and then give the partition
> key "5"
> > > > it has nothing to do with physical partition 5 (that does not exist)
> ,
> > > > similar to physical: partition = hash("5") mod 3
> > > >
> > > >
> > > > The only guarantee is that all messages with the same key goes to the
> > > same
> > > > partition. This is useful to make sure that for example all logs
> from the
> > > > same ip goest to the same partition which means that they can be
> read by
> > > > the same producer.
> > > >
> > > > /svante
> > > >
> > > >
> > > >
> > > > 2014-11-26 2:42 GMT+01:00 Haoming Zhang <ha...@outlook.com>:
> > > >
> > > > >
> > > > >
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I'm struggling with how to use the partition key mechanism
> properly. My
> > > > > logic is set the partition number as 3, then  create three
> partition
> > > keys
> > > > > as "0", "1", "2", then use the partition keys to create three
> > > KeyedMessage
> > > > > such as
> > > > > KeyedMessage(topic, "0", message),
> > > > > KeyedMessage(topic, "1", message),
> > > > > KeyedMessage(topic, "2", message)
> > > > >
> > > > > After this, creating a producer instance to send out all the
> > > KeyedMessage.
> > > > >
> > > > > I expecting each KeyedMessage should enter to different partitions
> > > > > according to the different partition keys, which means
> > > > > KeyedMessage(topic, "0", message) go to Partition 0,
> > > > > KeyedMessage(topic, "1", message) go to Partition 1,
> > > > > KeyedMessage(topic, "2", message) go to Partition 2
> > > > >
> > > > > I'm using Kafka-web-console to watch the topic status, but the
> result
> > > is
> > > > > not like what I'm expecting. KeyedMessage still go to partitions
> > > randomly,
> > > > > some times two KeyedMessage will enter the same partition even they
> > > have
> > > > > different partition keys, .
> > > > >
> > > > > Not sure whether my logic is incorrect or I didn't understand the
> > > > > partition key mechanism correctly. Anyone could provides some
> sample
> > > code
> > > > > or explanation would be great!
> > > > >
> > > > > Thanks,
> > > > > Haoming
> > > > >
> > > > >
> > >
>
>

RE: Partition key not working properly

Posted by Haoming Zhang <ha...@outlook.com>.
Hi François,

I agree with you and Svante, so I think my logic is correct. But I really can't find why my problem happens and I was stuck here for weeks. I think posting some of my codes might be helpful, could you please give the codes a quick check?

Here is the producer codes, I didn't use the custom partitioner.class :
  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
  
  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }
  
  def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }        
  }

And here is how I use the producer, create a producer instance, then use this instance to send three message. Currently I create the partition key as Integer, then convert it to Byte Arrays:
  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  var testStatus = false

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() // Create a partition key as Byte Array
    producer.send(testMessage.getBytes("UTF8"), key)
  }

I appreciate all the suggestions and help!

Thanks,
Haoming

> From: f.langelier@gmail.com
> Date: Wed, 26 Nov 2014 19:57:13 +0000
> Subject: Re: Partition key not working properly
> To: users@kafka.apache.org
> 
> Hi haoming,
> 
> As far as I know, svante is right.
> 
> Maybe you modified your default partitioner?
> 
> or are you sure the same key go to different partitions? maybe its just 2
> keys that are going to the same partition?
> 
> Because it's possible that you have something like that
> - key "1" -> partition 3
> - key "2" -> partition 2
> - key "3" -> partition 3
> 
> and nothing in partition 1
> 
> On Wed Nov 26 2014 at 02:35:33 Haoming Zhang <ha...@outlook.com>
> wrote:
> 
> > Hi Svante,
> >
> > Thanks for your reply!
> >
> > As you said, my purpose is let "all messages with the same key goes to the
> > same partition", but the actual case is even I hard code the same partition
> > key(let's say the key is "1") for three messages, the messages are still
> > goes to different partitions.
> >
> > Regards,
> > Haoming
> >
> > > Date: Wed, 26 Nov 2014 08:03:04 +0100
> > > Subject: Re: Partition key not working properly
> > > From: saka@csi.se
> > > To: users@kafka.apache.org
> > >
> > > By default, the partition key is used for hashing then it's placed in a
> > > partition that has the appropriate hashed keyspace.
> > >
> > > If you have three physical partitions and then give the partition key "5"
> > > it has nothing to do with physical partition 5 (that does not exist) ,
> > > similar to physical: partition = hash("5") mod 3
> > >
> > >
> > > The only guarantee is that all messages with the same key goes to the
> > same
> > > partition. This is useful to make sure that for example all logs from the
> > > same ip goest to the same partition which means that they can be read by
> > > the same producer.
> > >
> > > /svante
> > >
> > >
> > >
> > > 2014-11-26 2:42 GMT+01:00 Haoming Zhang <ha...@outlook.com>:
> > >
> > > >
> > > >
> > > >
> > > > Hi all,
> > > >
> > > > I'm struggling with how to use the partition key mechanism properly. My
> > > > logic is set the partition number as 3, then  create three partition
> > keys
> > > > as "0", "1", "2", then use the partition keys to create three
> > KeyedMessage
> > > > such as
> > > > KeyedMessage(topic, "0", message),
> > > > KeyedMessage(topic, "1", message),
> > > > KeyedMessage(topic, "2", message)
> > > >
> > > > After this, creating a producer instance to send out all the
> > KeyedMessage.
> > > >
> > > > I expecting each KeyedMessage should enter to different partitions
> > > > according to the different partition keys, which means
> > > > KeyedMessage(topic, "0", message) go to Partition 0,
> > > > KeyedMessage(topic, "1", message) go to Partition 1,
> > > > KeyedMessage(topic, "2", message) go to Partition 2
> > > >
> > > > I'm using Kafka-web-console to watch the topic status, but the result
> > is
> > > > not like what I'm expecting. KeyedMessage still go to partitions
> > randomly,
> > > > some times two KeyedMessage will enter the same partition even they
> > have
> > > > different partition keys, .
> > > >
> > > > Not sure whether my logic is incorrect or I didn't understand the
> > > > partition key mechanism correctly. Anyone could provides some sample
> > code
> > > > or explanation would be great!
> > > >
> > > > Thanks,
> > > > Haoming
> > > >
> > > >
> >
 		 	   		  

Re: Partition key not working properly

Posted by François Langelier <f....@gmail.com>.
Hi haoming,

As far as I know, svante is right.

Maybe you modified your default partitioner?

or are you sure the same key go to different partitions? maybe its just 2
keys that are going to the same partition?

Because it's possible that you have something like that
- key "1" -> partition 3
- key "2" -> partition 2
- key "3" -> partition 3

and nothing in partition 1

On Wed Nov 26 2014 at 02:35:33 Haoming Zhang <ha...@outlook.com>
wrote:

> Hi Svante,
>
> Thanks for your reply!
>
> As you said, my purpose is let "all messages with the same key goes to the
> same partition", but the actual case is even I hard code the same partition
> key(let's say the key is "1") for three messages, the messages are still
> goes to different partitions.
>
> Regards,
> Haoming
>
> > Date: Wed, 26 Nov 2014 08:03:04 +0100
> > Subject: Re: Partition key not working properly
> > From: saka@csi.se
> > To: users@kafka.apache.org
> >
> > By default, the partition key is used for hashing then it's placed in a
> > partition that has the appropriate hashed keyspace.
> >
> > If you have three physical partitions and then give the partition key "5"
> > it has nothing to do with physical partition 5 (that does not exist) ,
> > similar to physical: partition = hash("5") mod 3
> >
> >
> > The only guarantee is that all messages with the same key goes to the
> same
> > partition. This is useful to make sure that for example all logs from the
> > same ip goest to the same partition which means that they can be read by
> > the same producer.
> >
> > /svante
> >
> >
> >
> > 2014-11-26 2:42 GMT+01:00 Haoming Zhang <ha...@outlook.com>:
> >
> > >
> > >
> > >
> > > Hi all,
> > >
> > > I'm struggling with how to use the partition key mechanism properly. My
> > > logic is set the partition number as 3, then  create three partition
> keys
> > > as "0", "1", "2", then use the partition keys to create three
> KeyedMessage
> > > such as
> > > KeyedMessage(topic, "0", message),
> > > KeyedMessage(topic, "1", message),
> > > KeyedMessage(topic, "2", message)
> > >
> > > After this, creating a producer instance to send out all the
> KeyedMessage.
> > >
> > > I expecting each KeyedMessage should enter to different partitions
> > > according to the different partition keys, which means
> > > KeyedMessage(topic, "0", message) go to Partition 0,
> > > KeyedMessage(topic, "1", message) go to Partition 1,
> > > KeyedMessage(topic, "2", message) go to Partition 2
> > >
> > > I'm using Kafka-web-console to watch the topic status, but the result
> is
> > > not like what I'm expecting. KeyedMessage still go to partitions
> randomly,
> > > some times two KeyedMessage will enter the same partition even they
> have
> > > different partition keys, .
> > >
> > > Not sure whether my logic is incorrect or I didn't understand the
> > > partition key mechanism correctly. Anyone could provides some sample
> code
> > > or explanation would be great!
> > >
> > > Thanks,
> > > Haoming
> > >
> > >
>

RE: Partition key not working properly

Posted by Haoming Zhang <ha...@outlook.com>.
Hi Svante,

Thanks for your reply!

As you said, my purpose is let "all messages with the same key goes to the same partition", but the actual case is even I hard code the same partition key(let's say the key is "1") for three messages, the messages are still goes to different partitions.

Regards,
Haoming

> Date: Wed, 26 Nov 2014 08:03:04 +0100
> Subject: Re: Partition key not working properly
> From: saka@csi.se
> To: users@kafka.apache.org
> 
> By default, the partition key is used for hashing then it's placed in a
> partition that has the appropriate hashed keyspace.
> 
> If you have three physical partitions and then give the partition key "5"
> it has nothing to do with physical partition 5 (that does not exist) ,
> similar to physical: partition = hash("5") mod 3
> 
> 
> The only guarantee is that all messages with the same key goes to the same
> partition. This is useful to make sure that for example all logs from the
> same ip goest to the same partition which means that they can be read by
> the same producer.
> 
> /svante
> 
> 
> 
> 2014-11-26 2:42 GMT+01:00 Haoming Zhang <ha...@outlook.com>:
> 
> >
> >
> >
> > Hi all,
> >
> > I'm struggling with how to use the partition key mechanism properly. My
> > logic is set the partition number as 3, then  create three partition keys
> > as "0", "1", "2", then use the partition keys to create three KeyedMessage
> > such as
> > KeyedMessage(topic, "0", message),
> > KeyedMessage(topic, "1", message),
> > KeyedMessage(topic, "2", message)
> >
> > After this, creating a producer instance to send out all the KeyedMessage.
> >
> > I expecting each KeyedMessage should enter to different partitions
> > according to the different partition keys, which means
> > KeyedMessage(topic, "0", message) go to Partition 0,
> > KeyedMessage(topic, "1", message) go to Partition 1,
> > KeyedMessage(topic, "2", message) go to Partition 2
> >
> > I'm using Kafka-web-console to watch the topic status, but the result is
> > not like what I'm expecting. KeyedMessage still go to partitions randomly,
> > some times two KeyedMessage will enter the same partition even they have
> > different partition keys, .
> >
> > Not sure whether my logic is incorrect or I didn't understand the
> > partition key mechanism correctly. Anyone could provides some sample code
> > or explanation would be great!
> >
> > Thanks,
> > Haoming
> >
> >
 		 	   		  

Re: Partition key not working properly

Posted by svante karlsson <sa...@csi.se>.
By default, the partition key is used for hashing then it's placed in a
partition that has the appropriate hashed keyspace.

If you have three physical partitions and then give the partition key "5"
it has nothing to do with physical partition 5 (that does not exist) ,
similar to physical: partition = hash("5") mod 3


The only guarantee is that all messages with the same key goes to the same
partition. This is useful to make sure that for example all logs from the
same ip goest to the same partition which means that they can be read by
the same producer.

/svante



2014-11-26 2:42 GMT+01:00 Haoming Zhang <ha...@outlook.com>:

>
>
>
> Hi all,
>
> I'm struggling with how to use the partition key mechanism properly. My
> logic is set the partition number as 3, then  create three partition keys
> as "0", "1", "2", then use the partition keys to create three KeyedMessage
> such as
> KeyedMessage(topic, "0", message),
> KeyedMessage(topic, "1", message),
> KeyedMessage(topic, "2", message)
>
> After this, creating a producer instance to send out all the KeyedMessage.
>
> I expecting each KeyedMessage should enter to different partitions
> according to the different partition keys, which means
> KeyedMessage(topic, "0", message) go to Partition 0,
> KeyedMessage(topic, "1", message) go to Partition 1,
> KeyedMessage(topic, "2", message) go to Partition 2
>
> I'm using Kafka-web-console to watch the topic status, but the result is
> not like what I'm expecting. KeyedMessage still go to partitions randomly,
> some times two KeyedMessage will enter the same partition even they have
> different partition keys, .
>
> Not sure whether my logic is incorrect or I didn't understand the
> partition key mechanism correctly. Anyone could provides some sample code
> or explanation would be great!
>
> Thanks,
> Haoming
>
>