You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chris Curtin <cu...@gmail.com> on 2013/02/25 20:50:38 UTC

'simple' partitioning in 0.8

Hi,

In an earlier thread about partitioning on 0.8 I read that you can provide
a key to the KeyedMessage constructor and all the messages with the key
would end up in the same partition, even if you don't provide a partition
function (vs the random assignment of a message to a partition).

When I do this I get an runtime error:

Code:

        long events = Long.parseLong(args[1]);
        int blocks = Integer.parseInt(args[2]);

        Random rnd = new Random();

        Properties props = new Properties();
        props.put("broker.list",
"vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);

        Producer<Integer, String> producer = new Producer<Integer,
String>(config);
          for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String msg = runtime + "," + (50 + nBlocks) + "," +
nEvents+ "," + rnd.nextInt(1000);
                KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test1", nBlocks, msg);
                producer.send(data);
            }
        }
        producer.close();

Runtime error:

0    [main] INFO  kafka.utils.VerifiableProperties  - Verifying properties
33   [main] INFO  kafka.utils.VerifiableProperties  - Property broker.list
is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
33   [main] INFO  kafka.utils.VerifiableProperties  - Property
serializer.class is overridden to kafka.serializer.StringEncoder
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)

Changing the logic to use my own partitioner that accepts a String vs. an
Integer above works correctly. So do I always need to define a partitioning
function?

Thanks,

Chris

Re: 'simple' partitioning in 0.8

Posted by Neha Narkhede <ne...@gmail.com>.
Chris,

Partitioning is independent of serialization. The key data is serialized
using the serializer specified through the "key.serializer.class" property
in the Producer. It defaults to a no-op encoder if you don't specify one.
Here, since you want to use an integer key, you'd have to plug in an
Encoder that can serialize integer data to a byte array.

Thanks,
Neha


On Mon, Feb 25, 2013 at 11:50 AM, Chris Curtin <cu...@gmail.com>wrote:

> Hi,
>
> In an earlier thread about partitioning on 0.8 I read that you can provide
> a key to the KeyedMessage constructor and all the messages with the key
> would end up in the same partition, even if you don't provide a partition
> function (vs the random assignment of a message to a partition).
>
> When I do this I get an runtime error:
>
> Code:
>
>         long events = Long.parseLong(args[1]);
>         int blocks = Integer.parseInt(args[2]);
>
>         Random rnd = new Random();
>
>         Properties props = new Properties();
>         props.put("broker.list",
> "vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>         ProducerConfig config = new ProducerConfig(props);
>
>         Producer<Integer, String> producer = new Producer<Integer,
> String>(config);
>           for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
>             for (long nEvents = 0; nEvents < events; nEvents++) {
>                 long runtime = new Date().getTime();
>                 String msg = runtime + "," + (50 + nBlocks) + "," +
> nEvents+ "," + rnd.nextInt(1000);
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test1", nBlocks, msg);
>                 producer.send(data);
>             }
>         }
>         producer.close();
>
> Runtime error:
>
> 0    [main] INFO  kafka.utils.VerifiableProperties  - Verifying properties
> 33   [main] INFO  kafka.utils.VerifiableProperties  - Property broker.list
> is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
> 33   [main] INFO  kafka.utils.VerifiableProperties  - Property
> serializer.class is overridden to kafka.serializer.StringEncoder
> Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
> cannot be cast to java.lang.String
> at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
> at
>
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)
>
> Changing the logic to use my own partitioner that accepts a String vs. an
> Integer above works correctly. So do I always need to define a partitioning
> function?
>
> Thanks,
>
> Chris
>