You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chris Curtin <cu...@gmail.com> on 2013/02/25 20:50:38 UTC
'simple' partitioning in 0.8
Hi,
In an earlier thread about partitioning on 0.8 I read that you can provide
a key to the KeyedMessage constructor and all the messages with the key
would end up in the same partition, even if you don't provide a partition
function (vs the random assignment of a message to a partition).
When I do this I get an runtime error:
Code:
long events = Long.parseLong(args[1]);
int blocks = Integer.parseInt(args[2]);
Random rnd = new Random();
Properties props = new Properties();
props.put("broker.list",
"vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<Integer, String> producer = new Producer<Integer,
String>(config);
for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String msg = runtime + "," + (50 + nBlocks) + "," +
nEvents+ "," + rnd.nextInt(1000);
KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test1", nBlocks, msg);
producer.send(data);
}
}
producer.close();
Runtime error:
0 [main] INFO kafka.utils.VerifiableProperties - Verifying properties
33 [main] INFO kafka.utils.VerifiableProperties - Property broker.list
is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
33 [main] INFO kafka.utils.VerifiableProperties - Property
serializer.class is overridden to kafka.serializer.StringEncoder
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)
Changing the logic to use my own partitioner that accepts a String vs. an
Integer above works correctly. So do I always need to define a partitioning
function?
Thanks,
Chris
Re: 'simple' partitioning in 0.8
Posted by Neha Narkhede <ne...@gmail.com>.
Chris,
Partitioning is independent of serialization. The key data is serialized
using the serializer specified through the "key.serializer.class" property
in the Producer. It defaults to a no-op encoder if you don't specify one.
Here, since you want to use an integer key, you'd have to plug in an
Encoder that can serialize integer data to a byte array.
Thanks,
Neha
On Mon, Feb 25, 2013 at 11:50 AM, Chris Curtin <cu...@gmail.com>wrote:
> Hi,
>
> In an earlier thread about partitioning on 0.8 I read that you can provide
> a key to the KeyedMessage constructor and all the messages with the key
> would end up in the same partition, even if you don't provide a partition
> function (vs the random assignment of a message to a partition).
>
> When I do this I get an runtime error:
>
> Code:
>
> long events = Long.parseLong(args[1]);
> int blocks = Integer.parseInt(args[2]);
>
> Random rnd = new Random();
>
> Properties props = new Properties();
> props.put("broker.list",
> "vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> ProducerConfig config = new ProducerConfig(props);
>
> Producer<Integer, String> producer = new Producer<Integer,
> String>(config);
> for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
> for (long nEvents = 0; nEvents < events; nEvents++) {
> long runtime = new Date().getTime();
> String msg = runtime + "," + (50 + nBlocks) + "," +
> nEvents+ "," + rnd.nextInt(1000);
> KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test1", nBlocks, msg);
> producer.send(data);
> }
> }
> producer.close();
>
> Runtime error:
>
> 0 [main] INFO kafka.utils.VerifiableProperties - Verifying properties
> 33 [main] INFO kafka.utils.VerifiableProperties - Property broker.list
> is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
> 33 [main] INFO kafka.utils.VerifiableProperties - Property
> serializer.class is overridden to kafka.serializer.StringEncoder
> Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
> cannot be cast to java.lang.String
> at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
> at
>
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)
>
> Changing the logic to use my own partitioner that accepts a String vs. an
> Integer above works correctly. So do I always need to define a partitioning
> function?
>
> Thanks,
>
> Chris
>