You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alex Melville <am...@g.hmc.edu> on 2015/02/20 09:34:51 UTC

Re: Can Mirroring Preserve Every Topic's Partition?

Neha and Guozhang,


This thread is several months old now but I'd like to follow up on it as I
have a couple more questions related to it.


1. Guozhang, you suggested 3 steps I take to ensure each piece of data
remains on the same partition from source to target cluster. In particular
you suggest


*        2. When the consumer of the MM gets a message, put the message to
the*
*producer's queue based on its partition id; i.e. if the partition id is n,*


*put to n's producer queue.*
*3. When producer sends the data, specify the partition id; so each
producer*        *will only send to a single partition.*

Since the MM is just using the default producer/consumer with no custom
logic written by me, how am I supposed to find a particular message's
partition id, and once I do that how am I supposed to specify which
producer queue to put that message in?



2. What does specifying the number of producer's in the MM do? Does each
producer only push to one partition within a cluster?




3. I wrote a SimpleProducer that publishes a record with a String key and
byte[] message to a kafka cluster. Originally the producer config file I
was passing to Mirrormaker had the following important parameters



*producer.type=sync*

*compression.codec=none*

*serializer.class=kafka.serializer.DefaultEncoder*
*key.serializer.class=kafka.serializer.StringEncoder*
*partitioner.class=org.rubicon.hmc.SimplePartitioner*



where SimplePartitioner just took the String key, hashed it, and then took
the resulting number mod the number of partitions and used that number as
the partition to publish to. However I kept getting an error when the
producer tried to publish to the target cluster which said something along
the lines of "[B cannot be cast to java.lang.String". I at last figured
this was because I had specified
key.serializer.class=kafka.serializer.StringEncoder in the producer, but
what it was receiving from the consumer thread was just a byte[] msg and
when it tried to use the StringEncoder an error was returned. Now I'm using
the default partitionar and default encoder, I'm not getting that
exception, and data is being copied from source to target cluster. However
I've lost the ability to partition data on a String key, because the MM
producer is just given a raw byte array. My question is, am I correct in
this reasoning behind the "[B cannot be cast to java.lang.String" error?






Thank you in advance Neha, Guozhang, and the rest of the Kafka community!


Alex Melville

On Fri, Dec 5, 2014 at 5:30 PM, Neha Narkhede <ne...@confluent.io> wrote:

> Going back to your previous requirement of ensuring that the data in the
> target cluster is in the same order as the source cluster, all you need is
> to specify a key with every record in your data. The mirror maker and its
> producer takes care of placing all the data for a particular key in the
> same partition on the target cluster. Effectively, all your data will be in
> the same order (though there may be a few duplicates as I mentioned
> before).
>
> Hope that helps!
>
> On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville <am...@g.hmc.edu> wrote:
>
> > Thank you for your replies Guozhang and Neha, though I have some followup
> > questions.
> >
> > I wrote my own Java Consumer and Producer based off of the Kafka Producer
> > API and High Level Consumer. Let's call them MyConsumer and MyProducer.
> > MyProducer uses a custom Partitioner class called SimplePartitioner. In
> the
> > producer.config file that I specify when I run the MirrorMaker from the
> > command line, there is a parameter "partitioner.class". I keep getting
> > "ClassDefNotFoundException exceptions, no matter if I put the absolute
> path
> > to my SimplePartitioner.class file, a relative path, or even when I add
> > SimplePartitioner.class to the $CLASSPATH variables created in the
> > kafka-run-class.sh script. Here is my output error:
> >
> > Exception in thread "main" java.lang.ClassNotFoundException:
> > SimplePartitioner.class
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:191)
> > at kafka.utils.Utils$.createObject(Utils.scala:438)
> > at kafka.producer.Producer.<init>(Producer.scala:60)
> > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
> > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > at scala.collection.immutable.Range.foreach(Range.scala:81)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > at scala.collection.immutable.Range.map(Range.scala:46)
> > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
> > at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> >
> > What is the correct value for the "partitioner.class" parameter in my
> > producer.properties config file?
> >
> >
> >
> >
> > Guozhang, in your reply to my original message you said "When the
> consumer
> > of the MM gets a message, put the message to the producer's queue...".
> This
> > seems to imply that I can specify my own custom Consumer and Producer
> when
> > I run the Mirrormaker. How can I do this? Or, if I'm understand
> incorrectly
> > and I have to use whichever default consumer/producer the Mirrormaker
> uses,
> > how can I get that consumer to learn which partition it's reading from,
> > pass that info to the producer, and then specify that partition ID when
> the
> > producer rights to the target cluster?
> >
> >
> > -Alex
> >
> > On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hello Alex,
> > >
> > > This can be done by doing some tweaks in the MM code (with the 0.8.2
> new
> > > producer).
> > >
> > > 1. Set-up your MM to have the total # of producers equal to the #. of
> > > partitions in source / target cluster.
> > >
> > > 2. When the consumer of the MM gets a message, put the message to the
> > > producer's queue based on its partition id; i.e. if the partition id is
> > n,
> > > put to n's producer queue.
> > >
> > > 3. When producer sends the data, specify the partition id; so each
> > producer
> > > will only send to a single partition.
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville <am...@g.hmc.edu>
> > > wrote:
> > >
> > > > Howdy friends,
> > > >
> > > >
> > > > I'd like to mirror the topics on several clusters to a central
> cluster,
> > > and
> > > > I'm looking at using the default Mirrormaker to do so. I've already
> > done
> > > > some basic testing on the Mirrormaker found here:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
> > > >
> > > > and managed to successfully copy a topic's partitions on a source
> > cluster
> > > > to a topic on a target cluster. So I'm able to mirror correctly.
> > However
> > > > for my particular use case I need to ensure that when I copy a
> topic's
> > > > partitions from source cluster to target cluster, a partition created
> > on
> > > > the target cluster contains data in the exact same order as the data
> on
> > > the
> > > > corresponding partition on the source cluster.
> > > >
> > > > I'm thinking of writing a Simple Consumer so I can manually compare
> the
> > > > events in a source cluster's partition with the corresponding
> partition
> > > on
> > > > the target cluster, but I'm not 100% sure if I'll be able to verify
> my
> > > > guarantee if I do it this way. Can anyone here verify that partitions
> > > > copied over to the target cluster by the default Mirrormaker are an
> > exact
> > > > copy of those on the source cluster?
> > > >
> > > >
> > > > Thanks in advance,
> > > >
> > > > Alex Melville
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> Thanks,
> Neha
>

Re: Can Mirroring Preserve Every Topic's Partition?

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
The Mirror maker in trunk now supports mirroring with preserved partition.
You can wire in a message handler to assign partitions for each producer
record before handing them to producer.

Jiangjie (Becket) Qin

On 3/31/15, 3:41 AM, "Ivan Balashov" <ib...@gmail.com> wrote:

>Alex,
>
>Just wondering, did you have any success patching and running MM with
>exact
>partitioning support?
>If so, could you possibly share the patch and, as I hope, your positive
>experience with the process?
>
>Thanks!


Re: Can Mirroring Preserve Every Topic's Partition?

Posted by Alex Melville <am...@g.hmc.edu>.
Hi Ivan,


Sorry to take so long in getting back to you. We made our own changes to
the MirrorMaker before we saw Jiangjie Qin say that the trunk had exact
partition mapping. I think it's best to use the current trunk's MM as
opposed to our's because there's probably has more test coverage. Let me
know how it goes!


Alex

On Tue, Mar 31, 2015 at 3:41 AM, Ivan Balashov <ib...@gmail.com> wrote:

> Alex,
>
> Just wondering, did you have any success patching and running MM with exact
> partitioning support?
> If so, could you possibly share the patch and, as I hope, your positive
> experience with the process?
>
> Thanks!
>

Re: Can Mirroring Preserve Every Topic's Partition?

Posted by Ivan Balashov <ib...@gmail.com>.
Alex,

Just wondering, did you have any success patching and running MM with exact
partitioning support?
If so, could you possibly share the patch and, as I hope, your positive
experience with the process?

Thanks!

Re: Can Mirroring Preserve Every Topic's Partition?

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Alex,

Sorry for getting late on this thread.

What I originally meant is not the changes in KAFKA-1650 itself, but a
slightly new version of MM as a follow-up of KAFKA-1650, details can be
seen from here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-3+-+Mirror+Maker+Enhancement

https://issues.apache.org/jira/browse/KAFKA-1997

It has a record handler function and a consumer rebalance callback
functions.

1. Using the rebalance callback, you can make sure that whenever the source
partition number changed, the destination partition will also update
accordingly, so that the source / destination clusters will always have the
same partitions for each topic.

2. Using the record handler function, you can transform the
consumed MessageAndMetadata into the ProducerRecord, while explicitly set
the partition id in the ProducerRecord as the partition id of the
MessageAndMetadata.

Guozhang


On Tue, Feb 24, 2015 at 9:26 AM, Alex Melville <am...@g.hmc.edu> wrote:

> Guozhang,
>
> Thank you for the reply, but could you be a little bit more detailed?
>
> When will this new MM with exact mirroring be rolled out? I went to the
> following URL to read up on Kafka-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> but that issue doesn't appear to be about mirroring, but rather about clean
> shutdowns.
>
> What exactly would I need to alter in the MM code? I've cloned the 8.2.0
> Kafka source and read through the kafka.tools.MirrorMaker.scala class, and
> cannot find a single mention of partition within its source. Looking at the
> ProducerThread and ConsumerThread classes, I would expect them to be passed
> some parameter that corresponded to the partition number of the message
> they're producing/consuming. However the closest thing I see to anything
> partition related is the "data.key" object.
>
> Some more guidance on this would be extremely helpful as this exact
> mirroring process seems like a common use case, and it's not clear how one
> should go about doing this.
>
>
> Thanks Guozhang,
>
> Alex Melville
>
> On Sun, Feb 22, 2015 at 3:53 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Alex,
> >
> > What I originally meant is that you probably need to manually modify the
> MM
> > code in order to achieve your needs. However, MM has been improved a lot
> > since last time we synced up, in the next major release the MM will
> support
> > exact mirroring (details in KAFKA-1650) with some functional extensions.
> > Basically you just need to ensure the source and destination cluster have
> > the some partitions for each topic, which can be done via consumer
> > rebalance callbacks.
> >
> > Guozhang
> >
> > On Fri, Feb 20, 2015 at 12:34 AM, Alex Melville <am...@g.hmc.edu>
> > wrote:
> >
> > > Neha and Guozhang,
> > >
> > >
> > > This thread is several months old now but I'd like to follow up on it
> as
> > I
> > > have a couple more questions related to it.
> > >
> > >
> > > 1. Guozhang, you suggested 3 steps I take to ensure each piece of data
> > > remains on the same partition from source to target cluster. In
> > particular
> > > you suggest
> > >
> > >
> > > *        2. When the consumer of the MM gets a message, put the message
> > to
> > > the*
> > > *producer's queue based on its partition id; i.e. if the partition id
> is
> > > n,*
> > >
> > >
> > > *put to n's producer queue.*
> > > *3. When producer sends the data, specify the partition id; so each
> > > producer*        *will only send to a single partition.*
> > >
> > > Since the MM is just using the default producer/consumer with no custom
> > > logic written by me, how am I supposed to find a particular message's
> > > partition id, and once I do that how am I supposed to specify which
> > > producer queue to put that message in?
> > >
> > >
> > >
> > > 2. What does specifying the number of producer's in the MM do? Does
> each
> > > producer only push to one partition within a cluster?
> > >
> > >
> > >
> > >
> > > 3. I wrote a SimpleProducer that publishes a record with a String key
> and
> > > byte[] message to a kafka cluster. Originally the producer config file
> I
> > > was passing to Mirrormaker had the following important parameters
> > >
> > >
> > >
> > > *producer.type=sync*
> > >
> > > *compression.codec=none*
> > >
> > > *serializer.class=kafka.serializer.DefaultEncoder*
> > > *key.serializer.class=kafka.serializer.StringEncoder*
> > > *partitioner.class=org.rubicon.hmc.SimplePartitioner*
> > >
> > >
> > >
> > > where SimplePartitioner just took the String key, hashed it, and then
> > took
> > > the resulting number mod the number of partitions and used that number
> as
> > > the partition to publish to. However I kept getting an error when the
> > > producer tried to publish to the target cluster which said something
> > along
> > > the lines of "[B cannot be cast to java.lang.String". I at last figured
> > > this was because I had specified
> > > key.serializer.class=kafka.serializer.StringEncoder in the producer,
> but
> > > what it was receiving from the consumer thread was just a byte[] msg
> and
> > > when it tried to use the StringEncoder an error was returned. Now I'm
> > using
> > > the default partitionar and default encoder, I'm not getting that
> > > exception, and data is being copied from source to target cluster.
> > However
> > > I've lost the ability to partition data on a String key, because the MM
> > > producer is just given a raw byte array. My question is, am I correct
> in
> > > this reasoning behind the "[B cannot be cast to java.lang.String"
> error?
> > >
> > >
> > >
> > >
> > >
> > >
> > > Thank you in advance Neha, Guozhang, and the rest of the Kafka
> community!
> > >
> > >
> > > Alex Melville
> > >
> > > On Fri, Dec 5, 2014 at 5:30 PM, Neha Narkhede <ne...@confluent.io>
> wrote:
> > >
> > > > Going back to your previous requirement of ensuring that the data in
> > the
> > > > target cluster is in the same order as the source cluster, all you
> need
> > > is
> > > > to specify a key with every record in your data. The mirror maker and
> > its
> > > > producer takes care of placing all the data for a particular key in
> the
> > > > same partition on the target cluster. Effectively, all your data will
> > be
> > > in
> > > > the same order (though there may be a few duplicates as I mentioned
> > > > before).
> > > >
> > > > Hope that helps!
> > > >
> > > > On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville <am...@g.hmc.edu>
> > > wrote:
> > > >
> > > > > Thank you for your replies Guozhang and Neha, though I have some
> > > followup
> > > > > questions.
> > > > >
> > > > > I wrote my own Java Consumer and Producer based off of the Kafka
> > > Producer
> > > > > API and High Level Consumer. Let's call them MyConsumer and
> > MyProducer.
> > > > > MyProducer uses a custom Partitioner class called
> SimplePartitioner.
> > In
> > > > the
> > > > > producer.config file that I specify when I run the MirrorMaker from
> > the
> > > > > command line, there is a parameter "partitioner.class". I keep
> > getting
> > > > > "ClassDefNotFoundException exceptions, no matter if I put the
> > absolute
> > > > path
> > > > > to my SimplePartitioner.class file, a relative path, or even when I
> > add
> > > > > SimplePartitioner.class to the $CLASSPATH variables created in the
> > > > > kafka-run-class.sh script. Here is my output error:
> > > > >
> > > > > Exception in thread "main" java.lang.ClassNotFoundException:
> > > > > SimplePartitioner.class
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > > > at java.lang.Class.forName0(Native Method)
> > > > > at java.lang.Class.forName(Class.java:191)
> > > > > at kafka.utils.Utils$.createObject(Utils.scala:438)
> > > > > at kafka.producer.Producer.<init>(Producer.scala:60)
> > > > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
> > > > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > > > at scala.collection.immutable.Range.foreach(Range.scala:81)
> > > > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > > > > at scala.collection.immutable.Range.map(Range.scala:46)
> > > > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
> > > > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > > > >
> > > > > What is the correct value for the "partitioner.class" parameter in
> my
> > > > > producer.properties config file?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Guozhang, in your reply to my original message you said "When the
> > > > consumer
> > > > > of the MM gets a message, put the message to the producer's
> > queue...".
> > > > This
> > > > > seems to imply that I can specify my own custom Consumer and
> Producer
> > > > when
> > > > > I run the Mirrormaker. How can I do this? Or, if I'm understand
> > > > incorrectly
> > > > > and I have to use whichever default consumer/producer the
> Mirrormaker
> > > > uses,
> > > > > how can I get that consumer to learn which partition it's reading
> > from,
> > > > > pass that info to the producer, and then specify that partition ID
> > when
> > > > the
> > > > > producer rights to the target cluster?
> > > > >
> > > > >
> > > > > -Alex
> > > > >
> > > > > On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello Alex,
> > > > > >
> > > > > > This can be done by doing some tweaks in the MM code (with the
> > 0.8.2
> > > > new
> > > > > > producer).
> > > > > >
> > > > > > 1. Set-up your MM to have the total # of producers equal to the
> #.
> > of
> > > > > > partitions in source / target cluster.
> > > > > >
> > > > > > 2. When the consumer of the MM gets a message, put the message to
> > the
> > > > > > producer's queue based on its partition id; i.e. if the partition
> > id
> > > is
> > > > > n,
> > > > > > put to n's producer queue.
> > > > > >
> > > > > > 3. When producer sends the data, specify the partition id; so
> each
> > > > > producer
> > > > > > will only send to a single partition.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville <
> > amelville@g.hmc.edu>
> > > > > > wrote:
> > > > > >
> > > > > > > Howdy friends,
> > > > > > >
> > > > > > >
> > > > > > > I'd like to mirror the topics on several clusters to a central
> > > > cluster,
> > > > > > and
> > > > > > > I'm looking at using the default Mirrormaker to do so. I've
> > already
> > > > > done
> > > > > > > some basic testing on the Mirrormaker found here:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
> > > > > > >
> > > > > > > and managed to successfully copy a topic's partitions on a
> source
> > > > > cluster
> > > > > > > to a topic on a target cluster. So I'm able to mirror
> correctly.
> > > > > However
> > > > > > > for my particular use case I need to ensure that when I copy a
> > > > topic's
> > > > > > > partitions from source cluster to target cluster, a partition
> > > created
> > > > > on
> > > > > > > the target cluster contains data in the exact same order as the
> > > data
> > > > on
> > > > > > the
> > > > > > > corresponding partition on the source cluster.
> > > > > > >
> > > > > > > I'm thinking of writing a Simple Consumer so I can manually
> > compare
> > > > the
> > > > > > > events in a source cluster's partition with the corresponding
> > > > partition
> > > > > > on
> > > > > > > the target cluster, but I'm not 100% sure if I'll be able to
> > verify
> > > > my
> > > > > > > guarantee if I do it this way. Can anyone here verify that
> > > partitions
> > > > > > > copied over to the target cluster by the default Mirrormaker
> are
> > an
> > > > > exact
> > > > > > > copy of those on the source cluster?
> > > > > > >
> > > > > > >
> > > > > > > Thanks in advance,
> > > > > > >
> > > > > > > Alex Melville
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Neha
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Can Mirroring Preserve Every Topic's Partition?

Posted by Alex Melville <am...@g.hmc.edu>.
Guozhang,

Thank you for the reply, but could you be a little bit more detailed?

When will this new MM with exact mirroring be rolled out? I went to the
following URL to read up on Kafka-1650
https://issues.apache.org/jira/browse/KAFKA-1650
but that issue doesn't appear to be about mirroring, but rather about clean
shutdowns.

What exactly would I need to alter in the MM code? I've cloned the 8.2.0
Kafka source and read through the kafka.tools.MirrorMaker.scala class, and
cannot find a single mention of partition within its source. Looking at the
ProducerThread and ConsumerThread classes, I would expect them to be passed
some parameter that corresponded to the partition number of the message
they're producing/consuming. However the closest thing I see to anything
partition related is the "data.key" object.

Some more guidance on this would be extremely helpful as this exact
mirroring process seems like a common use case, and it's not clear how one
should go about doing this.


Thanks Guozhang,

Alex Melville

On Sun, Feb 22, 2015 at 3:53 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Alex,
>
> What I originally meant is that you probably need to manually modify the MM
> code in order to achieve your needs. However, MM has been improved a lot
> since last time we synced up, in the next major release the MM will support
> exact mirroring (details in KAFKA-1650) with some functional extensions.
> Basically you just need to ensure the source and destination cluster have
> the some partitions for each topic, which can be done via consumer
> rebalance callbacks.
>
> Guozhang
>
> On Fri, Feb 20, 2015 at 12:34 AM, Alex Melville <am...@g.hmc.edu>
> wrote:
>
> > Neha and Guozhang,
> >
> >
> > This thread is several months old now but I'd like to follow up on it as
> I
> > have a couple more questions related to it.
> >
> >
> > 1. Guozhang, you suggested 3 steps I take to ensure each piece of data
> > remains on the same partition from source to target cluster. In
> particular
> > you suggest
> >
> >
> > *        2. When the consumer of the MM gets a message, put the message
> to
> > the*
> > *producer's queue based on its partition id; i.e. if the partition id is
> > n,*
> >
> >
> > *put to n's producer queue.*
> > *3. When producer sends the data, specify the partition id; so each
> > producer*        *will only send to a single partition.*
> >
> > Since the MM is just using the default producer/consumer with no custom
> > logic written by me, how am I supposed to find a particular message's
> > partition id, and once I do that how am I supposed to specify which
> > producer queue to put that message in?
> >
> >
> >
> > 2. What does specifying the number of producer's in the MM do? Does each
> > producer only push to one partition within a cluster?
> >
> >
> >
> >
> > 3. I wrote a SimpleProducer that publishes a record with a String key and
> > byte[] message to a kafka cluster. Originally the producer config file I
> > was passing to Mirrormaker had the following important parameters
> >
> >
> >
> > *producer.type=sync*
> >
> > *compression.codec=none*
> >
> > *serializer.class=kafka.serializer.DefaultEncoder*
> > *key.serializer.class=kafka.serializer.StringEncoder*
> > *partitioner.class=org.rubicon.hmc.SimplePartitioner*
> >
> >
> >
> > where SimplePartitioner just took the String key, hashed it, and then
> took
> > the resulting number mod the number of partitions and used that number as
> > the partition to publish to. However I kept getting an error when the
> > producer tried to publish to the target cluster which said something
> along
> > the lines of "[B cannot be cast to java.lang.String". I at last figured
> > this was because I had specified
> > key.serializer.class=kafka.serializer.StringEncoder in the producer, but
> > what it was receiving from the consumer thread was just a byte[] msg and
> > when it tried to use the StringEncoder an error was returned. Now I'm
> using
> > the default partitionar and default encoder, I'm not getting that
> > exception, and data is being copied from source to target cluster.
> However
> > I've lost the ability to partition data on a String key, because the MM
> > producer is just given a raw byte array. My question is, am I correct in
> > this reasoning behind the "[B cannot be cast to java.lang.String" error?
> >
> >
> >
> >
> >
> >
> > Thank you in advance Neha, Guozhang, and the rest of the Kafka community!
> >
> >
> > Alex Melville
> >
> > On Fri, Dec 5, 2014 at 5:30 PM, Neha Narkhede <ne...@confluent.io> wrote:
> >
> > > Going back to your previous requirement of ensuring that the data in
> the
> > > target cluster is in the same order as the source cluster, all you need
> > is
> > > to specify a key with every record in your data. The mirror maker and
> its
> > > producer takes care of placing all the data for a particular key in the
> > > same partition on the target cluster. Effectively, all your data will
> be
> > in
> > > the same order (though there may be a few duplicates as I mentioned
> > > before).
> > >
> > > Hope that helps!
> > >
> > > On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville <am...@g.hmc.edu>
> > wrote:
> > >
> > > > Thank you for your replies Guozhang and Neha, though I have some
> > followup
> > > > questions.
> > > >
> > > > I wrote my own Java Consumer and Producer based off of the Kafka
> > Producer
> > > > API and High Level Consumer. Let's call them MyConsumer and
> MyProducer.
> > > > MyProducer uses a custom Partitioner class called SimplePartitioner.
> In
> > > the
> > > > producer.config file that I specify when I run the MirrorMaker from
> the
> > > > command line, there is a parameter "partitioner.class". I keep
> getting
> > > > "ClassDefNotFoundException exceptions, no matter if I put the
> absolute
> > > path
> > > > to my SimplePartitioner.class file, a relative path, or even when I
> add
> > > > SimplePartitioner.class to the $CLASSPATH variables created in the
> > > > kafka-run-class.sh script. Here is my output error:
> > > >
> > > > Exception in thread "main" java.lang.ClassNotFoundException:
> > > > SimplePartitioner.class
> > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > > at java.lang.Class.forName0(Native Method)
> > > > at java.lang.Class.forName(Class.java:191)
> > > > at kafka.utils.Utils$.createObject(Utils.scala:438)
> > > > at kafka.producer.Producer.<init>(Producer.scala:60)
> > > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
> > > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.foreach(Range.scala:81)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.map(Range.scala:46)
> > > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
> > > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > > >
> > > > What is the correct value for the "partitioner.class" parameter in my
> > > > producer.properties config file?
> > > >
> > > >
> > > >
> > > >
> > > > Guozhang, in your reply to my original message you said "When the
> > > consumer
> > > > of the MM gets a message, put the message to the producer's
> queue...".
> > > This
> > > > seems to imply that I can specify my own custom Consumer and Producer
> > > when
> > > > I run the Mirrormaker. How can I do this? Or, if I'm understand
> > > incorrectly
> > > > and I have to use whichever default consumer/producer the Mirrormaker
> > > uses,
> > > > how can I get that consumer to learn which partition it's reading
> from,
> > > > pass that info to the producer, and then specify that partition ID
> when
> > > the
> > > > producer rights to the target cluster?
> > > >
> > > >
> > > > -Alex
> > > >
> > > > On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Alex,
> > > > >
> > > > > This can be done by doing some tweaks in the MM code (with the
> 0.8.2
> > > new
> > > > > producer).
> > > > >
> > > > > 1. Set-up your MM to have the total # of producers equal to the #.
> of
> > > > > partitions in source / target cluster.
> > > > >
> > > > > 2. When the consumer of the MM gets a message, put the message to
> the
> > > > > producer's queue based on its partition id; i.e. if the partition
> id
> > is
> > > > n,
> > > > > put to n's producer queue.
> > > > >
> > > > > 3. When producer sends the data, specify the partition id; so each
> > > > producer
> > > > > will only send to a single partition.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville <
> amelville@g.hmc.edu>
> > > > > wrote:
> > > > >
> > > > > > Howdy friends,
> > > > > >
> > > > > >
> > > > > > I'd like to mirror the topics on several clusters to a central
> > > cluster,
> > > > > and
> > > > > > I'm looking at using the default Mirrormaker to do so. I've
> already
> > > > done
> > > > > > some basic testing on the Mirrormaker found here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
> > > > > >
> > > > > > and managed to successfully copy a topic's partitions on a source
> > > > cluster
> > > > > > to a topic on a target cluster. So I'm able to mirror correctly.
> > > > However
> > > > > > for my particular use case I need to ensure that when I copy a
> > > topic's
> > > > > > partitions from source cluster to target cluster, a partition
> > created
> > > > on
> > > > > > the target cluster contains data in the exact same order as the
> > data
> > > on
> > > > > the
> > > > > > corresponding partition on the source cluster.
> > > > > >
> > > > > > I'm thinking of writing a Simple Consumer so I can manually
> compare
> > > the
> > > > > > events in a source cluster's partition with the corresponding
> > > partition
> > > > > on
> > > > > > the target cluster, but I'm not 100% sure if I'll be able to
> verify
> > > my
> > > > > > guarantee if I do it this way. Can anyone here verify that
> > partitions
> > > > > > copied over to the target cluster by the default Mirrormaker are
> an
> > > > exact
> > > > > > copy of those on the source cluster?
> > > > > >
> > > > > >
> > > > > > Thanks in advance,
> > > > > >
> > > > > > Alex Melville
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Can Mirroring Preserve Every Topic's Partition?

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Alex,

What I originally meant is that you probably need to manually modify the MM
code in order to achieve your needs. However, MM has been improved a lot
since last time we synced up, in the next major release the MM will support
exact mirroring (details in KAFKA-1650) with some functional extensions.
Basically you just need to ensure the source and destination cluster have
the some partitions for each topic, which can be done via consumer
rebalance callbacks.

Guozhang

On Fri, Feb 20, 2015 at 12:34 AM, Alex Melville <am...@g.hmc.edu> wrote:

> Neha and Guozhang,
>
>
> This thread is several months old now but I'd like to follow up on it as I
> have a couple more questions related to it.
>
>
> 1. Guozhang, you suggested 3 steps I take to ensure each piece of data
> remains on the same partition from source to target cluster. In particular
> you suggest
>
>
> *        2. When the consumer of the MM gets a message, put the message to
> the*
> *producer's queue based on its partition id; i.e. if the partition id is
> n,*
>
>
> *put to n's producer queue.*
> *3. When producer sends the data, specify the partition id; so each
> producer*        *will only send to a single partition.*
>
> Since the MM is just using the default producer/consumer with no custom
> logic written by me, how am I supposed to find a particular message's
> partition id, and once I do that how am I supposed to specify which
> producer queue to put that message in?
>
>
>
> 2. What does specifying the number of producer's in the MM do? Does each
> producer only push to one partition within a cluster?
>
>
>
>
> 3. I wrote a SimpleProducer that publishes a record with a String key and
> byte[] message to a kafka cluster. Originally the producer config file I
> was passing to Mirrormaker had the following important parameters
>
>
>
> *producer.type=sync*
>
> *compression.codec=none*
>
> *serializer.class=kafka.serializer.DefaultEncoder*
> *key.serializer.class=kafka.serializer.StringEncoder*
> *partitioner.class=org.rubicon.hmc.SimplePartitioner*
>
>
>
> where SimplePartitioner just took the String key, hashed it, and then took
> the resulting number mod the number of partitions and used that number as
> the partition to publish to. However I kept getting an error when the
> producer tried to publish to the target cluster which said something along
> the lines of "[B cannot be cast to java.lang.String". I at last figured
> this was because I had specified
> key.serializer.class=kafka.serializer.StringEncoder in the producer, but
> what it was receiving from the consumer thread was just a byte[] msg and
> when it tried to use the StringEncoder an error was returned. Now I'm using
> the default partitionar and default encoder, I'm not getting that
> exception, and data is being copied from source to target cluster. However
> I've lost the ability to partition data on a String key, because the MM
> producer is just given a raw byte array. My question is, am I correct in
> this reasoning behind the "[B cannot be cast to java.lang.String" error?
>
>
>
>
>
>
> Thank you in advance Neha, Guozhang, and the rest of the Kafka community!
>
>
> Alex Melville
>
> On Fri, Dec 5, 2014 at 5:30 PM, Neha Narkhede <ne...@confluent.io> wrote:
>
> > Going back to your previous requirement of ensuring that the data in the
> > target cluster is in the same order as the source cluster, all you need
> is
> > to specify a key with every record in your data. The mirror maker and its
> > producer takes care of placing all the data for a particular key in the
> > same partition on the target cluster. Effectively, all your data will be
> in
> > the same order (though there may be a few duplicates as I mentioned
> > before).
> >
> > Hope that helps!
> >
> > On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville <am...@g.hmc.edu>
> wrote:
> >
> > > Thank you for your replies Guozhang and Neha, though I have some
> followup
> > > questions.
> > >
> > > I wrote my own Java Consumer and Producer based off of the Kafka
> Producer
> > > API and High Level Consumer. Let's call them MyConsumer and MyProducer.
> > > MyProducer uses a custom Partitioner class called SimplePartitioner. In
> > the
> > > producer.config file that I specify when I run the MirrorMaker from the
> > > command line, there is a parameter "partitioner.class". I keep getting
> > > "ClassDefNotFoundException exceptions, no matter if I put the absolute
> > path
> > > to my SimplePartitioner.class file, a relative path, or even when I add
> > > SimplePartitioner.class to the $CLASSPATH variables created in the
> > > kafka-run-class.sh script. Here is my output error:
> > >
> > > Exception in thread "main" java.lang.ClassNotFoundException:
> > > SimplePartitioner.class
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > at java.lang.Class.forName0(Native Method)
> > > at java.lang.Class.forName(Class.java:191)
> > > at kafka.utils.Utils$.createObject(Utils.scala:438)
> > > at kafka.producer.Producer.<init>(Producer.scala:60)
> > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
> > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > at scala.collection.immutable.Range.foreach(Range.scala:81)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > > at scala.collection.immutable.Range.map(Range.scala:46)
> > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
> > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > >
> > > What is the correct value for the "partitioner.class" parameter in my
> > > producer.properties config file?
> > >
> > >
> > >
> > >
> > > Guozhang, in your reply to my original message you said "When the
> > consumer
> > > of the MM gets a message, put the message to the producer's queue...".
> > This
> > > seems to imply that I can specify my own custom Consumer and Producer
> > when
> > > I run the Mirrormaker. How can I do this? Or, if I'm understand
> > incorrectly
> > > and I have to use whichever default consumer/producer the Mirrormaker
> > uses,
> > > how can I get that consumer to learn which partition it's reading from,
> > > pass that info to the producer, and then specify that partition ID when
> > the
> > > producer rights to the target cluster?
> > >
> > >
> > > -Alex
> > >
> > > On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Hello Alex,
> > > >
> > > > This can be done by doing some tweaks in the MM code (with the 0.8.2
> > new
> > > > producer).
> > > >
> > > > 1. Set-up your MM to have the total # of producers equal to the #. of
> > > > partitions in source / target cluster.
> > > >
> > > > 2. When the consumer of the MM gets a message, put the message to the
> > > > producer's queue based on its partition id; i.e. if the partition id
> is
> > > n,
> > > > put to n's producer queue.
> > > >
> > > > 3. When producer sends the data, specify the partition id; so each
> > > producer
> > > > will only send to a single partition.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville <am...@g.hmc.edu>
> > > > wrote:
> > > >
> > > > > Howdy friends,
> > > > >
> > > > >
> > > > > I'd like to mirror the topics on several clusters to a central
> > cluster,
> > > > and
> > > > > I'm looking at using the default Mirrormaker to do so. I've already
> > > done
> > > > > some basic testing on the Mirrormaker found here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
> > > > >
> > > > > and managed to successfully copy a topic's partitions on a source
> > > cluster
> > > > > to a topic on a target cluster. So I'm able to mirror correctly.
> > > However
> > > > > for my particular use case I need to ensure that when I copy a
> > topic's
> > > > > partitions from source cluster to target cluster, a partition
> created
> > > on
> > > > > the target cluster contains data in the exact same order as the
> data
> > on
> > > > the
> > > > > corresponding partition on the source cluster.
> > > > >
> > > > > I'm thinking of writing a Simple Consumer so I can manually compare
> > the
> > > > > events in a source cluster's partition with the corresponding
> > partition
> > > > on
> > > > > the target cluster, but I'm not 100% sure if I'll be able to verify
> > my
> > > > > guarantee if I do it this way. Can anyone here verify that
> partitions
> > > > > copied over to the target cluster by the default Mirrormaker are an
> > > exact
> > > > > copy of those on the source cluster?
> > > > >
> > > > >
> > > > > Thanks in advance,
> > > > >
> > > > > Alex Melville
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>



-- 
-- Guozhang