You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Aman Bazayev <ab...@talenttech.com> on 2013/05/28 20:31:14 UTC

MirrorMaker consumer does not use broker.list property

Hi,

I'd like to bump this issue:
https://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CFA0E8A0482D176408729D604142248F319D22CA7%40EXCHANGE14.actuate.com%3E
as I'm encountering the same problem.

It seems that the MirrorMaker does the following:
1) if zk.connect is defined for the consumer, MirrorMaker script ignores
broker.list value
2) if zk.connect is not definied for the consumer, MirrorMaker fails with
an error message:

[2013-05-28 11:24:57,457] INFO group1_ip-<hostname> Connecting to zookeeper
instance at null (kafka.consumer.ZookeeperConsumerConnector)
[2013-05-28 11:24:57,457] INFO Initiating client connection,
connectString=null sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@c820344(org.apache.zookeeper.ZooKeeper)
[2013-05-28 11:24:57,461] INFO Starting ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
Exception in thread "main" java.lang.NullPointerException
    at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:361)
    at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:332)
    at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:383)
    at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
    at
kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
    at
kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:122)
    at
kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:129)
    at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
    at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
    at scala.collection.immutable.List.foreach(List.scala:45)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.immutable.List.map(List.scala:45)
    at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:102)
    at kafka.tools.MirrorMaker.main(MirrorMaker.scala)

The command I run is

$KAFKA_ROOT/bin/kafka-run-class.sh kafka.tools.MirrorMaker
--consumer.config ./mirror_consumer.properties --producer.config
./mirror_producer.properties --whitelist=".*" --num.streams 1

The ./mirror_consumer.properties file contains:

broker.list=1:localhost:59092,2:localhost:59093
groupid=group1
shallowiterator.enable=true

When broker.list is commented out and zk.connect is defined instead (or
when both are defined), mirroring happens successfully.

Any leads?

Thank you,
Aman

Re: MirrorMaker consumer does not use broker.list property

Posted by Aman Bazayev <ab...@talenttech.com>.
Our design goal is to have two geographically distributed sets of Kafka in
sync. Since the data travels across the Internet, it needs to be
encrypted-in-transit.

We are currently looking into using MirrorMaker and I was hoping to utilize
SSL tunnels (implemented via stunnel) as a solution. The idea is that
source ZooKeeper reports a list of source Kafka's, stunnel sets up SSL
tunnels and consumer then gets pointed at local endpoints of the tunnels.

If you have ideas (better yet, solutions that have been used in production)
on how to achieve the goal described in the first paragraph, I'm all ears.

Regards,
Aman

On Wed, May 29, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:

> No, MirrorMaker uses our high level consumer which ignores broker.list. Is
> there a particular reason that you want the consumer in MirrorMaker to know
> the broker list?
>
> Thanks,
>
> Jun
>
>
> On Tue, May 28, 2013 at 11:24 PM, Aman Bazayev <abazayev@talenttech.com
> >wrote:
>
> > Thanks for the response. What if I'm trying to set a broker.list for
> > consumer statically, is there a way to do that?
> >
> > On Tue, May 28, 2013 at 9:05 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Consumer only needs zk.connect, not broker.list.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, May 28, 2013 at 11:31 AM, Aman Bazayev <
> abazayev@talenttech.com
> > > >wrote:
> > >
> > > > Hi,
> > > >
> > > > I'd like to bump this issue:
> > > >
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CFA0E8A0482D176408729D604142248F319D22CA7%40EXCHANGE14.actuate.com%3E
> > > > as I'm encountering the same problem.
> > > >
> > > > It seems that the MirrorMaker does the following:
> > > > 1) if zk.connect is defined for the consumer, MirrorMaker script
> > ignores
> > > > broker.list value
> > > > 2) if zk.connect is not definied for the consumer, MirrorMaker fails
> > with
> > > > an error message:
> > > >
> > > > [2013-05-28 11:24:57,457] INFO group1_ip-<hostname> Connecting to
> > > zookeeper
> > > > instance at null (kafka.consumer.ZookeeperConsumerConnector)
> > > > [2013-05-28 11:24:57,457] INFO Initiating client connection,
> > > > connectString=null sessionTimeout=6000
> > > > watcher=org.I0Itec.zkclient.ZkClient@c820344
> > > > (org.apache.zookeeper.ZooKeeper)
> > > > [2013-05-28 11:24:57,461] INFO Starting ZkClient event thread.
> > > > (org.I0Itec.zkclient.ZkEventThread)
> > > > Exception in thread "main" java.lang.NullPointerException
> > > >     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:361)
> > > >     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:332)
> > > >     at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:383)
> > > >     at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
> > > >     at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
> > > >     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
> > > >     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:122)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:129)
> > > >     at
> kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
> > > >     at
> kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
> > > >     at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >     at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >     at
> > > >
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > >     at scala.collection.immutable.List.foreach(List.scala:45)
> > > >     at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > >     at scala.collection.immutable.List.map(List.scala:45)
> > > >     at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:102)
> > > >     at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > > >
> > > > The command I run is
> > > >
> > > > $KAFKA_ROOT/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> > > > --consumer.config ./mirror_consumer.properties --producer.config
> > > > ./mirror_producer.properties --whitelist=".*" --num.streams 1
> > > >
> > > > The ./mirror_consumer.properties file contains:
> > > >
> > > > broker.list=1:localhost:59092,2:localhost:59093
> > > > groupid=group1
> > > > shallowiterator.enable=true
> > > >
> > > > When broker.list is commented out and zk.connect is defined instead
> (or
> > > > when both are defined), mirroring happens successfully.
> > > >
> > > > Any leads?
> > > >
> > > > Thank you,
> > > > Aman
> > > >
> > >
> >
>

Re: MirrorMaker consumer does not use broker.list property

Posted by Jun Rao <ju...@gmail.com>.
No, MirrorMaker uses our high level consumer which ignores broker.list. Is
there a particular reason that you want the consumer in MirrorMaker to know
the broker list?

Thanks,

Jun


On Tue, May 28, 2013 at 11:24 PM, Aman Bazayev <ab...@talenttech.com>wrote:

> Thanks for the response. What if I'm trying to set a broker.list for
> consumer statically, is there a way to do that?
>
> On Tue, May 28, 2013 at 9:05 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Consumer only needs zk.connect, not broker.list.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, May 28, 2013 at 11:31 AM, Aman Bazayev <abazayev@talenttech.com
> > >wrote:
> >
> > > Hi,
> > >
> > > I'd like to bump this issue:
> > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CFA0E8A0482D176408729D604142248F319D22CA7%40EXCHANGE14.actuate.com%3E
> > > as I'm encountering the same problem.
> > >
> > > It seems that the MirrorMaker does the following:
> > > 1) if zk.connect is defined for the consumer, MirrorMaker script
> ignores
> > > broker.list value
> > > 2) if zk.connect is not definied for the consumer, MirrorMaker fails
> with
> > > an error message:
> > >
> > > [2013-05-28 11:24:57,457] INFO group1_ip-<hostname> Connecting to
> > zookeeper
> > > instance at null (kafka.consumer.ZookeeperConsumerConnector)
> > > [2013-05-28 11:24:57,457] INFO Initiating client connection,
> > > connectString=null sessionTimeout=6000
> > > watcher=org.I0Itec.zkclient.ZkClient@c820344
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-05-28 11:24:57,461] INFO Starting ZkClient event thread.
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > Exception in thread "main" java.lang.NullPointerException
> > >     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:361)
> > >     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:332)
> > >     at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:383)
> > >     at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
> > >     at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
> > >     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
> > >     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
> > >     at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
> > >     at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:122)
> > >     at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:129)
> > >     at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
> > >     at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
> > >     at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >     at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >     at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > >     at scala.collection.immutable.List.foreach(List.scala:45)
> > >     at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >     at scala.collection.immutable.List.map(List.scala:45)
> > >     at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:102)
> > >     at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > >
> > > The command I run is
> > >
> > > $KAFKA_ROOT/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> > > --consumer.config ./mirror_consumer.properties --producer.config
> > > ./mirror_producer.properties --whitelist=".*" --num.streams 1
> > >
> > > The ./mirror_consumer.properties file contains:
> > >
> > > broker.list=1:localhost:59092,2:localhost:59093
> > > groupid=group1
> > > shallowiterator.enable=true
> > >
> > > When broker.list is commented out and zk.connect is defined instead (or
> > > when both are defined), mirroring happens successfully.
> > >
> > > Any leads?
> > >
> > > Thank you,
> > > Aman
> > >
> >
>

Re: MirrorMaker consumer does not use broker.list property

Posted by Aman Bazayev <ab...@talenttech.com>.
Thanks for the response. What if I'm trying to set a broker.list for
consumer statically, is there a way to do that?

On Tue, May 28, 2013 at 9:05 PM, Jun Rao <ju...@gmail.com> wrote:

> Consumer only needs zk.connect, not broker.list.
>
> Thanks,
>
> Jun
>
>
> On Tue, May 28, 2013 at 11:31 AM, Aman Bazayev <abazayev@talenttech.com
> >wrote:
>
> > Hi,
> >
> > I'd like to bump this issue:
> >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CFA0E8A0482D176408729D604142248F319D22CA7%40EXCHANGE14.actuate.com%3E
> > as I'm encountering the same problem.
> >
> > It seems that the MirrorMaker does the following:
> > 1) if zk.connect is defined for the consumer, MirrorMaker script ignores
> > broker.list value
> > 2) if zk.connect is not definied for the consumer, MirrorMaker fails with
> > an error message:
> >
> > [2013-05-28 11:24:57,457] INFO group1_ip-<hostname> Connecting to
> zookeeper
> > instance at null (kafka.consumer.ZookeeperConsumerConnector)
> > [2013-05-28 11:24:57,457] INFO Initiating client connection,
> > connectString=null sessionTimeout=6000
> > watcher=org.I0Itec.zkclient.ZkClient@c820344
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-05-28 11:24:57,461] INFO Starting ZkClient event thread.
> > (org.I0Itec.zkclient.ZkEventThread)
> > Exception in thread "main" java.lang.NullPointerException
> >     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:361)
> >     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:332)
> >     at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:383)
> >     at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
> >     at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
> >     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
> >     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
> >     at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
> >     at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:122)
> >     at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:129)
> >     at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
> >     at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
> >     at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >     at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >     at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >     at scala.collection.immutable.List.foreach(List.scala:45)
> >     at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >     at scala.collection.immutable.List.map(List.scala:45)
> >     at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:102)
> >     at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> >
> > The command I run is
> >
> > $KAFKA_ROOT/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> > --consumer.config ./mirror_consumer.properties --producer.config
> > ./mirror_producer.properties --whitelist=".*" --num.streams 1
> >
> > The ./mirror_consumer.properties file contains:
> >
> > broker.list=1:localhost:59092,2:localhost:59093
> > groupid=group1
> > shallowiterator.enable=true
> >
> > When broker.list is commented out and zk.connect is defined instead (or
> > when both are defined), mirroring happens successfully.
> >
> > Any leads?
> >
> > Thank you,
> > Aman
> >
>

Re: MirrorMaker consumer does not use broker.list property

Posted by Jun Rao <ju...@gmail.com>.
Consumer only needs zk.connect, not broker.list.

Thanks,

Jun


On Tue, May 28, 2013 at 11:31 AM, Aman Bazayev <ab...@talenttech.com>wrote:

> Hi,
>
> I'd like to bump this issue:
>
> https://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CFA0E8A0482D176408729D604142248F319D22CA7%40EXCHANGE14.actuate.com%3E
> as I'm encountering the same problem.
>
> It seems that the MirrorMaker does the following:
> 1) if zk.connect is defined for the consumer, MirrorMaker script ignores
> broker.list value
> 2) if zk.connect is not definied for the consumer, MirrorMaker fails with
> an error message:
>
> [2013-05-28 11:24:57,457] INFO group1_ip-<hostname> Connecting to zookeeper
> instance at null (kafka.consumer.ZookeeperConsumerConnector)
> [2013-05-28 11:24:57,457] INFO Initiating client connection,
> connectString=null sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@c820344
> (org.apache.zookeeper.ZooKeeper)
> [2013-05-28 11:24:57,461] INFO Starting ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> Exception in thread "main" java.lang.NullPointerException
>     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:361)
>     at org.apache.zookeeper.ClientCnxn.<init>(ClientCnxn.java:332)
>     at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:383)
>     at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>     at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
>     at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
>     at
>
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>     at
>
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:122)
>     at
>
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:129)
>     at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
>     at kafka.tools.MirrorMaker$$anonfun$3.apply(MirrorMaker.scala:102)
>     at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>     at scala.collection.immutable.List.foreach(List.scala:45)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>     at scala.collection.immutable.List.map(List.scala:45)
>     at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:102)
>     at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>
> The command I run is
>
> $KAFKA_ROOT/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> --consumer.config ./mirror_consumer.properties --producer.config
> ./mirror_producer.properties --whitelist=".*" --num.streams 1
>
> The ./mirror_consumer.properties file contains:
>
> broker.list=1:localhost:59092,2:localhost:59093
> groupid=group1
> shallowiterator.enable=true
>
> When broker.list is commented out and zk.connect is defined instead (or
> when both are defined), mirroring happens successfully.
>
> Any leads?
>
> Thank you,
> Aman
>