You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Prabhjot Bharaj <pr...@gmail.com> on 2015/07/28 20:01:01 UTC

KAfka Mirror Maker

Hi,

I'm using Mirror Maker with a cluster of 3 nodes and cluster of 5 nodes.

I would like to ask - is the number of nodes a restriction for Mirror Maker?
Also, are there any other restrictions or properties that should be common
across both the clusters so that they continue mirroring.


I'm asking this because I've got this error while mirroring:-

[2015-07-28 17:51:10,943] WARN Fetching topic metadata with correlation id
0 for topics [Set(fromIndiaWithLove)] from broker
[id:3,host:a10.2.3.4,port:9092] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-07-28 17:51:18,955] WARN Fetching topic metadata with correlation id
0 for topics [Set(fromIndiaWithLove)] from broker
[id:2,host:10.2.3.5,port:9092] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-07-28 17:51:27,043] WARN Fetching topic metadata with correlation id
0 for topics [Set(fromIndiaWithLove)] from broker
[id:5,host:a10.2.3.6port:9092] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



This is what my *consumer config* looks like:-

*zookeeper.connect=10.2.3.4:2182 <http://10.2.3.4:2182>*

*zookeeper.connection.timeout.ms
<http://zookeeper.connection.timeout.ms>=1000000*

*consumer.timeout.ms <http://consumer.timeout.ms>=-1*

*group.id <http://group.id>=dp-mirrorMaker-test-datap1*

*shallow.iterator.enable=true*

*auto.create.topics.enable=true*



I've used the default* producer.properties* in kafka/config/ which has
these properteis:-

*metadata.broker.list=localhost:9092*


*producer.type=sync*

*compression.codec=none*


*serializer.class=kafka.serializer.DefaultEncoder*


I'm running Mirror Maker via this command:-


 /kafka_2.10-0.8.2.0/bin/kafka-run-class.sh kafka.tools.MirrorMaker
--consumer.config ~/sourceCluster1Consumer.config  --num.streams 1
--producer.config producer.properties --whitelist=".*"

Regards,

prabcs

Re: KAfka Mirror Maker

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Mirror Maker does not have specific restrictions on cluster size.

The error you saw was because consumer was not able to talk to the broker.
Can you try to use kafka-console-consumer to consume some data from your
source cluster and see if it works? It should be under KAFKA_HOME/bin/

Jiangjie (Becket) Qin

On Tue, Jul 28, 2015 at 11:01 AM, Prabhjot Bharaj <pr...@gmail.com>
wrote:

> Hi,
>
> I'm using Mirror Maker with a cluster of 3 nodes and cluster of 5 nodes.
>
> I would like to ask - is the number of nodes a restriction for Mirror
> Maker?
> Also, are there any other restrictions or properties that should be common
> across both the clusters so that they continue mirroring.
>
>
> I'm asking this because I've got this error while mirroring:-
>
> [2015-07-28 17:51:10,943] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:3,host:a10.2.3.4,port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-07-28 17:51:18,955] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:2,host:10.2.3.5,port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-07-28 17:51:27,043] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:5,host:a10.2.3.6port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
>
> This is what my *consumer config* looks like:-
>
> *zookeeper.connect=10.2.3.4:2182 <http://10.2.3.4:2182>*
>
> *zookeeper.connection.timeout.ms
> <http://zookeeper.connection.timeout.ms>=1000000*
>
> *consumer.timeout.ms <http://consumer.timeout.ms>=-1*
>
> *group.id <http://group.id>=dp-mirrorMaker-test-datap1*
>
> *shallow.iterator.enable=true*
>
> *auto.create.topics.enable=true*
>
>
>
> I've used the default* producer.properties* in kafka/config/ which has
> these properteis:-
>
> *metadata.broker.list=localhost:9092*
>
>
> *producer.type=sync*
>
> *compression.codec=none*
>
>
> *serializer.class=kafka.serializer.DefaultEncoder*
>
>
> I'm running Mirror Maker via this command:-
>
>
>  /kafka_2.10-0.8.2.0/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> --consumer.config ~/sourceCluster1Consumer.config  --num.streams 1
> --producer.config producer.properties --whitelist=".*"
>
> Regards,
>
> prabcs
>

Re: KAfka Mirror Maker

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Mirror Maker does not have specific restrictions on cluster size.

The error you saw was because consumer was not able to talk to the broker.
Can you try to use kafka-console-consumer to consume some data from your
source cluster and see if it works? It should be under KAFKA_HOME/bin/

Jiangjie (Becket) Qin

On Tue, Jul 28, 2015 at 11:01 AM, Prabhjot Bharaj <pr...@gmail.com>
wrote:

> Hi,
>
> I'm using Mirror Maker with a cluster of 3 nodes and cluster of 5 nodes.
>
> I would like to ask - is the number of nodes a restriction for Mirror
> Maker?
> Also, are there any other restrictions or properties that should be common
> across both the clusters so that they continue mirroring.
>
>
> I'm asking this because I've got this error while mirroring:-
>
> [2015-07-28 17:51:10,943] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:3,host:a10.2.3.4,port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-07-28 17:51:18,955] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:2,host:10.2.3.5,port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-07-28 17:51:27,043] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:5,host:a10.2.3.6port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
>
> This is what my *consumer config* looks like:-
>
> *zookeeper.connect=10.2.3.4:2182 <http://10.2.3.4:2182>*
>
> *zookeeper.connection.timeout.ms
> <http://zookeeper.connection.timeout.ms>=1000000*
>
> *consumer.timeout.ms <http://consumer.timeout.ms>=-1*
>
> *group.id <http://group.id>=dp-mirrorMaker-test-datap1*
>
> *shallow.iterator.enable=true*
>
> *auto.create.topics.enable=true*
>
>
>
> I've used the default* producer.properties* in kafka/config/ which has
> these properteis:-
>
> *metadata.broker.list=localhost:9092*
>
>
> *producer.type=sync*
>
> *compression.codec=none*
>
>
> *serializer.class=kafka.serializer.DefaultEncoder*
>
>
> I'm running Mirror Maker via this command:-
>
>
>  /kafka_2.10-0.8.2.0/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> --consumer.config ~/sourceCluster1Consumer.config  --num.streams 1
> --producer.config producer.properties --whitelist=".*"
>
> Regards,
>
> prabcs
>