You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sotaro Kimura <rf...@gmail.com> on 2014/04/09 00:08:55 UTC

Re: How produce to remote Kafka Cluster over ssh tunneling

Hi, all.

Sorry for my late reply.

I tried advertised.host.name/advertised.port.
But producer failed fetching metadata from kafka broker.

Configuration is below.
- Kafka cluster in DataCenter(each machine has Grobal IP Address).
Firewall allows only ssh port(22).
ex) 192.168.100.100
- Kafka producer exists out of DataCenter.
ex) 172.16.0.100

Broker additional setting is below.
advertised.host.name=localhost
advertised.port=19092

Kafka producer operation is below.

- SSH port fowarding
========================
# ssh root@192.168.100.100 -L 19092:192.168.100.100:9092 -L 12181:
192.168.100.100:2181
// Kafka broker and zookeeper ssh port fowarding.
========================

- Producer data put(failed)
========================
# ./kafka-topics.sh --create --zookeeper localhost:12181
--replication-factor 1 --partitions 3 --topic TestTopic
Created topic "TestTopic".
// Succeed create topic over ssh port fowarding.
# ./kafka-topics.sh --list --zookeeper localhost:12181
TestTopic
// Succeed get topic list over ssh port fowarding.
# ./kafka-console-producer.sh --broker-list localhost:19092 --topic
TestTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
Test
[2014-04-06 20:42:01,352] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,364] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: TestTopic
(kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,476] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,481] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: TestTopic
(kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,589] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,595] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: TestTopic
(kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,709] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartit
ionInfo)
[2014-04-06 20:42:01,719] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: TestTopic
(kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,826] WARN Error while fetching metadata
[{TopicMetadata for topic TestTopic ->No partition metadata for topic
TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
[TestTopic]: class kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,829] ERROR Failed to send requests for topics
TestTopic with correlation ids in [0,8]
(kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
========================


If advertised setting is not defined,
producer connection is refused.(below error)
So I think advertised setting is working.

========================
# ./kafka-console-producer.sh --broker-list localhost:19092 --topic
TestTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
Test
[2014-04-08 21:11:44,231] ERROR Producer connection to localhost:9092
unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
========================

Is there anyone you find the cause of error.

regards.




2014-03-20 8:00 GMT+09:00 Sotaro Kimura <rf...@gmail.com>:

> Thanks for quick reply.
>
>
> advertised.host.name/advertised.port setting is this.
> I confirmed only producer setting, so I couldn't find these properties.
>
> I will try it. Thank you.
>
>
> 2014-03-20 7:50 GMT+09:00 Joe Stein <jo...@stealth.ly>:
>
> With 0.8.1 update these two properties in the server.properties config of
>> the broker to be 172.16.0.100 and 19092 respectively
>>
>> # Hostname the broker will advertise to producers and consumers. If not
>> set, it uses the
>> # value for "host.name" if configured.  Otherwise, it will use the value
>> returned from
>> # java.net.InetAddress.getCanonicalHostName().
>> #advertised.host.name=<hostname routable by clients>
>>
>> # The port to publish to ZooKeeper for clients to use. If this is not set,
>> # it will publish the same port that the broker binds to.
>> #advertised.port=<port accessible by clients>
>>
>> /*******************************************
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> ********************************************/
>>
>>
>> On Wed, Mar 19, 2014 at 6:39 PM, Sotaro Kimura <rf...@gmail.com>
>> wrote:
>>
>> > Hi, all.
>> >
>> > I'm trying Log produce to Remote Kafka Cluster over SSH Tunneling.
>> >
>> > Configuration is below.
>> > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
>> > Firewall allows only ssh port(22).
>> > ex) 192.168.100.100
>> > - Kafka producer exists out of DataCenter.
>> > ex) 172.16.0.100
>> >
>> > I tried produce over SSH Tunneling.
>> > Producer successed getting meta data from brokers.
>> > But meta data is Address which producer can't access directry.
>> >
>> > Detail is below.
>> > 1. Ssh connect producer machine to DataCenter.
>> > 172.16.0.100 > 192.168.100.100:22
>> > 2. Create SSH Tunnel over ssh connection.
>> > 172.16.0.100's localhost:19092 to 192.168.100.100:9092(Kafka receive
>> > port).
>> > 3. Producer connect to localhost:19092.
>> > Producer get metadata from brokers(192.168.100.100:9092).
>> > 4. Producer failed to connect brokers(192.168.100.100:9092).
>> >
>> > In such situations, how I can produce to Remote Kafka Cluster?
>> >
>> > --
>> > #################################
>> > 木村 宗太郎 Sotaro Kimura
>> > <rf...@gmail.com>
>> > #################################
>> >
>>
>
>
>
> --
> #################################
> 木村 宗太郎 Sotaro Kimura
> <rf...@gmail.com>
> #################################
>



-- 
#################################
木村 宗太郎 Sotaro Kimura
<rf...@gmail.com>
#################################

Re: How produce to remote Kafka Cluster over ssh tunneling

Posted by Sotaro Kimura <rf...@gmail.com>.
Thanks for reply.

That is certainly not good idea.
Other clients can't kafka cluster with these settings.

advertised.host.name=localhost
advertised.port=19092

When staging environment, I will construct proxy commented configuration.

Temporary, I succeeded message produce over ssh tunnel.
Setting is below.

- Broker additional setting.
advertised.host.name=localhost
advertised.port=9092

- SSH port fowarding
========================
# ssh root@192.168.100.100 -L 9092:192.168.100.100:9092 -L 12181:
192.168.100.100:2181
// modify kafka port fowarding 19092 to 9092
========================

- Producer data put
========================
# ./kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
TestMessage
========================

- Consumer data get
========================
# ./kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
TestMessage
========================

Thanks for having help me.

regards.




2014-04-10 0:44 GMT+09:00 Roger Hoover <ro...@gmail.com>:

> I think setting these is not a good idea b/c only apply to the specific
> client where you've setup the tunnel.  Other clients cannot use these
> settings
>
> advertised.host.name=localhost
> advertised.port=19092
>
> You probably need to figure out another way such as
> 1) Setting up a local mapping on your producer that matches the remote
> host:port
>     - Edit /etc/hosts to map 172.16.0.100 to localhost
>     - Forward localhost:9092 to localhost:19092
> 2) Change the producer client to translate the broker metadata it gets back
> 3) Create a proxy on the remote side that does all the talking to Kafka.
>  Change your producer to only talk to the proxy
>
>
>
> On Wed, Apr 9, 2014 at 8:04 AM, Sotaro Kimura <rf...@gmail.com> wrote:
>
> > I confirmed.
> >
> > - Controller.log
> > After kafka broker stated.
> > Kafka broker continues outputting below log.
> > Error is continued.
> > =======
> > [2014-04-09 23:55:07,709] ERROR [Controller-0-to-broker-0-send-thread],
> > Controller 0's connection to broker id:0,host:localhost,port:19092 was
> > unsuccessful (kafka.controller.RequestSendThread)
> > java.net.ConnectException: Connection refused
> >         at sun.nio.ch.Net.connect0(Native Method)
> >         at sun.nio.ch.Net.connect(Net.java:465)
> >         at sun.nio.ch.Net.connect(Net.java:457)
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at
> >
> >
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
> >         at
> >
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
> >         at
> >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-04-09 23:55:08,009] ERROR [Controller-0-to-broker-0-send-thread],
> > Controller 0 epoch 7 failed to send UpdateMetadata request with
> correlation
> > id 3 to broker id:0,host:localhost,port:19092. Reconnecting to broker.
> > (kafka.controller.RequestSendThread)
> > java.nio.channels.ClosedChannelException
> >         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> >         at
> >
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
> >         at
> >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > =======
> >
> > - state change log
> > When kafka broker started.
> > Below log outputed each partition.
> > =======
> > [2014-04-09 23:53:43,341] TRACE Controller 0 epoch 7 started leader
> > election for partition [TestTopic,0] (state.change.logger)
> > kafka.common.NoReplicaOnlineException: No replica for partition
> > [TestTopic,0] is alive. Live brokers are: [Set()], Assigned replicas are:
> > [List(0)]
> >         at
> >
> >
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
> >         at
> >
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
> >         at
> >
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
> >         at
> >
> >
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
> >         at
> >
> >
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
> >         at
> >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >         at
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> >         at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> >         at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> >         at
> >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
> >         at
> >
> >
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
> >         at
> >
> >
> kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
> >         at
> >
> >
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
> >         at
> >
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
> >         at
> >
> >
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
> >         at
> >
> >
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
> >         at
> >
> >
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >         at
> >
> >
> kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >         at
> > kafka.controller.KafkaController.startup(KafkaController.scala:626)
> >         at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
> >         at
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> >         at kafka.Kafka$.main(Kafka.scala:46)
> >         at kafka.Kafka.main(Kafka.scala)
> > =======
> >
> > regards.
> >
> >
> > 2014-04-09 13:03 GMT+09:00 Jun Rao <ju...@gmail.com>:
> >
> > > Any error in the broker, controller and state change log?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Apr 8, 2014 at 3:08 PM, Sotaro Kimura <rf...@gmail.com>
> > wrote:
> > >
> > > > Hi, all.
> > > >
> > > > Sorry for my late reply.
> > > >
> > > > I tried advertised.host.name/advertised.port.
> > > > But producer failed fetching metadata from kafka broker.
> > > >
> > > > Configuration is below.
> > > > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> > > > Firewall allows only ssh port(22).
> > > > ex) 192.168.100.100
> > > > - Kafka producer exists out of DataCenter.
> > > > ex) 172.16.0.100
> > > >
> > > > Broker additional setting is below.
> > > > advertised.host.name=localhost
> > > > advertised.port=19092
> > > >
> > > > Kafka producer operation is below.
> > > >
> > > > - SSH port fowarding
> > > > ========================
> > > > # ssh root@192.168.100.100 -L 19092:192.168.100.100:9092 -L 12181:
> > > > 192.168.100.100:2181
> > > > // Kafka broker and zookeeper ssh port fowarding.
> > > > ========================
> > > >
> > > > - Producer data put(failed)
> > > > ========================
> > > > # ./kafka-topics.sh --create --zookeeper localhost:12181
> > > > --replication-factor 1 --partitions 3 --topic TestTopic
> > > > Created topic "TestTopic".
> > > > // Succeed create topic over ssh port fowarding.
> > > > # ./kafka-topics.sh --list --zookeeper localhost:12181
> > > > TestTopic
> > > > // Succeed get topic list over ssh port fowarding.
> > > > # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> > > > TestTopic
> > > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > > further
> > > > details.
> > > > Test
> > > > [2014-04-06 20:42:01,352] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,364] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic,
> > > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-04-06 20:42:01,476] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,481] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic,
> > > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-04-06 20:42:01,589] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,595] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic,
> > > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-04-06 20:42:01,709] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartit
> > > > ionInfo)
> > > > [2014-04-06 20:42:01,719] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic,
> > > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-04-06 20:42:01,826] WARN Error while fetching metadata
> > > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > > (kafka.producer.BrokerPartitionInfo)
> > > > [2014-04-06 20:42:01,829] ERROR Failed to send requests for topics
> > > > TestTopic with correlation ids in [0,8]
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events
> > > > (kafka.producer.async.ProducerSendThread)
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 3
> > > > tries.
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> > > > at
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> > > > ========================
> > > >
> > > >
> > > > If advertised setting is not defined,
> > > > producer connection is refused.(below error)
> > > > So I think advertised setting is working.
> > > >
> > > > ========================
> > > > # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> > > > TestTopic
> > > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > > further
> > > > details.
> > > > Test
> > > > [2014-04-08 21:11:44,231] ERROR Producer connection to localhost:9092
> > > > unsuccessful (kafka.producer.SyncProducer)
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > ========================
> > > >
> > > > Is there anyone you find the cause of error.
> > > >
> > > > regards.
> > > >
> > > >
> > > >
> > > >
> > > > 2014-03-20 8:00 GMT+09:00 Sotaro Kimura <rf...@gmail.com>:
> > > >
> > > > > Thanks for quick reply.
> > > > >
> > > > >
> > > > > advertised.host.name/advertised.port setting is this.
> > > > > I confirmed only producer setting, so I couldn't find these
> > properties.
> > > > >
> > > > > I will try it. Thank you.
> > > > >
> > > > >
> > > > > 2014-03-20 7:50 GMT+09:00 Joe Stein <jo...@stealth.ly>:
> > > > >
> > > > > With 0.8.1 update these two properties in the server.properties
> > config
> > > of
> > > > >> the broker to be 172.16.0.100 and 19092 respectively
> > > > >>
> > > > >> # Hostname the broker will advertise to producers and consumers.
> If
> > > not
> > > > >> set, it uses the
> > > > >> # value for "host.name" if configured.  Otherwise, it will use
> the
> > > > value
> > > > >> returned from
> > > > >> # java.net.InetAddress.getCanonicalHostName().
> > > > >> #advertised.host.name=<hostname routable by clients>
> > > > >>
> > > > >> # The port to publish to ZooKeeper for clients to use. If this is
> > not
> > > > set,
> > > > >> # it will publish the same port that the broker binds to.
> > > > >> #advertised.port=<port accessible by clients>
> > > > >>
> > > > >> /*******************************************
> > > > >>  Joe Stein
> > > > >>  Founder, Principal Consultant
> > > > >>  Big Data Open Source Security LLC
> > > > >>  http://www.stealth.ly
> > > > >>  Twitter: @allthingshadoop <
> http://www.twitter.com/allthingshadoop>
> > > > >> ********************************************/
> > > > >>
> > > > >>
> > > > >> On Wed, Mar 19, 2014 at 6:39 PM, Sotaro Kimura <
> rfbringer@gmail.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi, all.
> > > > >> >
> > > > >> > I'm trying Log produce to Remote Kafka Cluster over SSH
> Tunneling.
> > > > >> >
> > > > >> > Configuration is below.
> > > > >> > - Kafka cluster in DataCenter(each machine has Grobal IP
> Address).
> > > > >> > Firewall allows only ssh port(22).
> > > > >> > ex) 192.168.100.100
> > > > >> > - Kafka producer exists out of DataCenter.
> > > > >> > ex) 172.16.0.100
> > > > >> >
> > > > >> > I tried produce over SSH Tunneling.
> > > > >> > Producer successed getting meta data from brokers.
> > > > >> > But meta data is Address which producer can't access directry.
> > > > >> >
> > > > >> > Detail is below.
> > > > >> > 1. Ssh connect producer machine to DataCenter.
> > > > >> > 172.16.0.100 > 192.168.100.100:22
> > > > >> > 2. Create SSH Tunnel over ssh connection.
> > > > >> > 172.16.0.100's localhost:19092 to 192.168.100.100:9092(Kafka
> > > receive
> > > > >> > port).
> > > > >> > 3. Producer connect to localhost:19092.
> > > > >> > Producer get metadata from brokers(192.168.100.100:9092).
> > > > >> > 4. Producer failed to connect brokers(192.168.100.100:9092).
> > > > >> >
> > > > >> > In such situations, how I can produce to Remote Kafka Cluster?
> > > > >> >
> > > > >> > --
> > > > >> > #################################
> > > > >> > 木村 宗太郎 Sotaro Kimura
> > > > >> > <rf...@gmail.com>
> > > > >> > #################################
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > #################################
> > > > > 木村 宗太郎 Sotaro Kimura
> > > > > <rf...@gmail.com>
> > > > > #################################
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > #################################
> > > > 木村 宗太郎 Sotaro Kimura
> > > > <rf...@gmail.com>
> > > > #################################
> > > >
> > >
> >
> >
> >
> > --
> > #################################
> > 木村 宗太郎 Sotaro Kimura
> > <rf...@gmail.com>
> > #################################
> >
>



-- 
#################################
木村 宗太郎 Sotaro Kimura
<rf...@gmail.com>
#################################

Re: How produce to remote Kafka Cluster over ssh tunneling

Posted by Roger Hoover <ro...@gmail.com>.
I think setting these is not a good idea b/c only apply to the specific
client where you've setup the tunnel.  Other clients cannot use these
settings

advertised.host.name=localhost
advertised.port=19092

You probably need to figure out another way such as
1) Setting up a local mapping on your producer that matches the remote
host:port
    - Edit /etc/hosts to map 172.16.0.100 to localhost
    - Forward localhost:9092 to localhost:19092
2) Change the producer client to translate the broker metadata it gets back
3) Create a proxy on the remote side that does all the talking to Kafka.
 Change your producer to only talk to the proxy



On Wed, Apr 9, 2014 at 8:04 AM, Sotaro Kimura <rf...@gmail.com> wrote:

> I confirmed.
>
> - Controller.log
> After kafka broker stated.
> Kafka broker continues outputting below log.
> Error is continued.
> =======
> [2014-04-09 23:55:07,709] ERROR [Controller-0-to-broker-0-send-thread],
> Controller 0's connection to broker id:0,host:localhost,port:19092 was
> unsuccessful (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at
>
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>         at
>
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>         at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-09 23:55:08,009] ERROR [Controller-0-to-broker-0-send-thread],
> Controller 0 epoch 7 failed to send UpdateMetadata request with correlation
> id 3 to broker id:0,host:localhost,port:19092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>         at
>
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> =======
>
> - state change log
> When kafka broker started.
> Below log outputed each partition.
> =======
> [2014-04-09 23:53:43,341] TRACE Controller 0 epoch 7 started leader
> election for partition [TestTopic,0] (state.change.logger)
> kafka.common.NoReplicaOnlineException: No replica for partition
> [TestTopic,0] is alive. Live brokers are: [Set()], Assigned replicas are:
> [List(0)]
>         at
>
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
>         at
>
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
>         at
>
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
>         at
>
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
>         at
>
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
>         at
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>         at
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>         at
>
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
>         at
>
> kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
>         at
>
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
>         at
>
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
>         at
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
>         at
>
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
>         at
>
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
>         at
>
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
>         at kafka.utils.Utils$.inLock(Utils.scala:538)
>         at
>
> kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
>         at
>
> kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
>         at
>
> kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
>         at
>
> kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
>         at kafka.utils.Utils$.inLock(Utils.scala:538)
>         at
> kafka.controller.KafkaController.startup(KafkaController.scala:626)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
>         at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>         at kafka.Kafka$.main(Kafka.scala:46)
>         at kafka.Kafka.main(Kafka.scala)
> =======
>
> regards.
>
>
> 2014-04-09 13:03 GMT+09:00 Jun Rao <ju...@gmail.com>:
>
> > Any error in the broker, controller and state change log?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Apr 8, 2014 at 3:08 PM, Sotaro Kimura <rf...@gmail.com>
> wrote:
> >
> > > Hi, all.
> > >
> > > Sorry for my late reply.
> > >
> > > I tried advertised.host.name/advertised.port.
> > > But producer failed fetching metadata from kafka broker.
> > >
> > > Configuration is below.
> > > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> > > Firewall allows only ssh port(22).
> > > ex) 192.168.100.100
> > > - Kafka producer exists out of DataCenter.
> > > ex) 172.16.0.100
> > >
> > > Broker additional setting is below.
> > > advertised.host.name=localhost
> > > advertised.port=19092
> > >
> > > Kafka producer operation is below.
> > >
> > > - SSH port fowarding
> > > ========================
> > > # ssh root@192.168.100.100 -L 19092:192.168.100.100:9092 -L 12181:
> > > 192.168.100.100:2181
> > > // Kafka broker and zookeeper ssh port fowarding.
> > > ========================
> > >
> > > - Producer data put(failed)
> > > ========================
> > > # ./kafka-topics.sh --create --zookeeper localhost:12181
> > > --replication-factor 1 --partitions 3 --topic TestTopic
> > > Created topic "TestTopic".
> > > // Succeed create topic over ssh port fowarding.
> > > # ./kafka-topics.sh --list --zookeeper localhost:12181
> > > TestTopic
> > > // Succeed get topic list over ssh port fowarding.
> > > # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> > > TestTopic
> > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further
> > > details.
> > > Test
> > > [2014-04-06 20:42:01,352] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,364] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic,
> > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-04-06 20:42:01,476] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,481] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic,
> > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-04-06 20:42:01,589] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,595] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic,
> > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-04-06 20:42:01,709] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartit
> > > ionInfo)
> > > [2014-04-06 20:42:01,719] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic,
> > > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-04-06 20:42:01,826] WARN Error while fetching metadata
> > > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > > (kafka.producer.BrokerPartitionInfo)
> > > [2014-04-06 20:42:01,829] ERROR Failed to send requests for topics
> > > TestTopic with correlation ids in [0,8]
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events
> > > (kafka.producer.async.ProducerSendThread)
> > > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> > 3
> > > tries.
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> > > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> > > ========================
> > >
> > >
> > > If advertised setting is not defined,
> > > producer connection is refused.(below error)
> > > So I think advertised setting is working.
> > >
> > > ========================
> > > # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> > > TestTopic
> > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further
> > > details.
> > > Test
> > > [2014-04-08 21:11:44,231] ERROR Producer connection to localhost:9092
> > > unsuccessful (kafka.producer.SyncProducer)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > ========================
> > >
> > > Is there anyone you find the cause of error.
> > >
> > > regards.
> > >
> > >
> > >
> > >
> > > 2014-03-20 8:00 GMT+09:00 Sotaro Kimura <rf...@gmail.com>:
> > >
> > > > Thanks for quick reply.
> > > >
> > > >
> > > > advertised.host.name/advertised.port setting is this.
> > > > I confirmed only producer setting, so I couldn't find these
> properties.
> > > >
> > > > I will try it. Thank you.
> > > >
> > > >
> > > > 2014-03-20 7:50 GMT+09:00 Joe Stein <jo...@stealth.ly>:
> > > >
> > > > With 0.8.1 update these two properties in the server.properties
> config
> > of
> > > >> the broker to be 172.16.0.100 and 19092 respectively
> > > >>
> > > >> # Hostname the broker will advertise to producers and consumers. If
> > not
> > > >> set, it uses the
> > > >> # value for "host.name" if configured.  Otherwise, it will use the
> > > value
> > > >> returned from
> > > >> # java.net.InetAddress.getCanonicalHostName().
> > > >> #advertised.host.name=<hostname routable by clients>
> > > >>
> > > >> # The port to publish to ZooKeeper for clients to use. If this is
> not
> > > set,
> > > >> # it will publish the same port that the broker binds to.
> > > >> #advertised.port=<port accessible by clients>
> > > >>
> > > >> /*******************************************
> > > >>  Joe Stein
> > > >>  Founder, Principal Consultant
> > > >>  Big Data Open Source Security LLC
> > > >>  http://www.stealth.ly
> > > >>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > >> ********************************************/
> > > >>
> > > >>
> > > >> On Wed, Mar 19, 2014 at 6:39 PM, Sotaro Kimura <rfbringer@gmail.com
> >
> > > >> wrote:
> > > >>
> > > >> > Hi, all.
> > > >> >
> > > >> > I'm trying Log produce to Remote Kafka Cluster over SSH Tunneling.
> > > >> >
> > > >> > Configuration is below.
> > > >> > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> > > >> > Firewall allows only ssh port(22).
> > > >> > ex) 192.168.100.100
> > > >> > - Kafka producer exists out of DataCenter.
> > > >> > ex) 172.16.0.100
> > > >> >
> > > >> > I tried produce over SSH Tunneling.
> > > >> > Producer successed getting meta data from brokers.
> > > >> > But meta data is Address which producer can't access directry.
> > > >> >
> > > >> > Detail is below.
> > > >> > 1. Ssh connect producer machine to DataCenter.
> > > >> > 172.16.0.100 > 192.168.100.100:22
> > > >> > 2. Create SSH Tunnel over ssh connection.
> > > >> > 172.16.0.100's localhost:19092 to 192.168.100.100:9092(Kafka
> > receive
> > > >> > port).
> > > >> > 3. Producer connect to localhost:19092.
> > > >> > Producer get metadata from brokers(192.168.100.100:9092).
> > > >> > 4. Producer failed to connect brokers(192.168.100.100:9092).
> > > >> >
> > > >> > In such situations, how I can produce to Remote Kafka Cluster?
> > > >> >
> > > >> > --
> > > >> > #################################
> > > >> > 木村 宗太郎 Sotaro Kimura
> > > >> > <rf...@gmail.com>
> > > >> > #################################
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > #################################
> > > > 木村 宗太郎 Sotaro Kimura
> > > > <rf...@gmail.com>
> > > > #################################
> > > >
> > >
> > >
> > >
> > > --
> > > #################################
> > > 木村 宗太郎 Sotaro Kimura
> > > <rf...@gmail.com>
> > > #################################
> > >
> >
>
>
>
> --
> #################################
> 木村 宗太郎 Sotaro Kimura
> <rf...@gmail.com>
> #################################
>

Re: How produce to remote Kafka Cluster over ssh tunneling

Posted by Sotaro Kimura <rf...@gmail.com>.
I confirmed.

- Controller.log
After kafka broker stated.
Kafka broker continues outputting below log.
Error is continued.
=======
[2014-04-09 23:55:07,709] ERROR [Controller-0-to-broker-0-send-thread],
Controller 0's connection to broker id:0,host:localhost,port:19092 was
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
        at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
        at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-09 23:55:08,009] ERROR [Controller-0-to-broker-0-send-thread],
Controller 0 epoch 7 failed to send UpdateMetadata request with correlation
id 3 to broker id:0,host:localhost,port:19092. Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
        at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
=======

- state change log
When kafka broker started.
Below log outputed each partition.
=======
[2014-04-09 23:53:43,341] TRACE Controller 0 epoch 7 started leader
election for partition [TestTopic,0] (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition
[TestTopic,0] is alive. Live brokers are: [Set()], Assigned replicas are:
[List(0)]
        at
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
        at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
        at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
        at
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
        at
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
        at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
        at scala.collection.Iterator$class.foreach(Iterator.scala:772)
        at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
        at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
        at
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
        at
kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
        at
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
        at
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
        at
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
        at
kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
        at
kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
        at
kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at
kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
        at
kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
        at
kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
        at
kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at
kafka.controller.KafkaController.startup(KafkaController.scala:626)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
        at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
        at kafka.Kafka$.main(Kafka.scala:46)
        at kafka.Kafka.main(Kafka.scala)
=======

regards.


2014-04-09 13:03 GMT+09:00 Jun Rao <ju...@gmail.com>:

> Any error in the broker, controller and state change log?
>
> Thanks,
>
> Jun
>
>
> On Tue, Apr 8, 2014 at 3:08 PM, Sotaro Kimura <rf...@gmail.com> wrote:
>
> > Hi, all.
> >
> > Sorry for my late reply.
> >
> > I tried advertised.host.name/advertised.port.
> > But producer failed fetching metadata from kafka broker.
> >
> > Configuration is below.
> > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> > Firewall allows only ssh port(22).
> > ex) 192.168.100.100
> > - Kafka producer exists out of DataCenter.
> > ex) 172.16.0.100
> >
> > Broker additional setting is below.
> > advertised.host.name=localhost
> > advertised.port=19092
> >
> > Kafka producer operation is below.
> >
> > - SSH port fowarding
> > ========================
> > # ssh root@192.168.100.100 -L 19092:192.168.100.100:9092 -L 12181:
> > 192.168.100.100:2181
> > // Kafka broker and zookeeper ssh port fowarding.
> > ========================
> >
> > - Producer data put(failed)
> > ========================
> > # ./kafka-topics.sh --create --zookeeper localhost:12181
> > --replication-factor 1 --partitions 3 --topic TestTopic
> > Created topic "TestTopic".
> > // Succeed create topic over ssh port fowarding.
> > # ./kafka-topics.sh --list --zookeeper localhost:12181
> > TestTopic
> > // Succeed get topic list over ssh port fowarding.
> > # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> > TestTopic
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further
> > details.
> > Test
> > [2014-04-06 20:42:01,352] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,364] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic,
> > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-04-06 20:42:01,476] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,481] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic,
> > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-04-06 20:42:01,589] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,595] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic,
> > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-04-06 20:42:01,709] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartit
> > ionInfo)
> > [2014-04-06 20:42:01,719] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic,
> > partition due to: Failed to fetch topic metadata for topic: TestTopic
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-04-06 20:42:01,826] WARN Error while fetching metadata
> > [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> > TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> > [TestTopic]: class kafka.common.LeaderNotAvailableException
> > (kafka.producer.BrokerPartitionInfo)
> > [2014-04-06 20:42:01,829] ERROR Failed to send requests for topics
> > TestTopic with correlation ids in [0,8]
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events
> > (kafka.producer.async.ProducerSendThread)
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> > ========================
> >
> >
> > If advertised setting is not defined,
> > producer connection is refused.(below error)
> > So I think advertised setting is working.
> >
> > ========================
> > # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> > TestTopic
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further
> > details.
> > Test
> > [2014-04-08 21:11:44,231] ERROR Producer connection to localhost:9092
> > unsuccessful (kafka.producer.SyncProducer)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > ========================
> >
> > Is there anyone you find the cause of error.
> >
> > regards.
> >
> >
> >
> >
> > 2014-03-20 8:00 GMT+09:00 Sotaro Kimura <rf...@gmail.com>:
> >
> > > Thanks for quick reply.
> > >
> > >
> > > advertised.host.name/advertised.port setting is this.
> > > I confirmed only producer setting, so I couldn't find these properties.
> > >
> > > I will try it. Thank you.
> > >
> > >
> > > 2014-03-20 7:50 GMT+09:00 Joe Stein <jo...@stealth.ly>:
> > >
> > > With 0.8.1 update these two properties in the server.properties config
> of
> > >> the broker to be 172.16.0.100 and 19092 respectively
> > >>
> > >> # Hostname the broker will advertise to producers and consumers. If
> not
> > >> set, it uses the
> > >> # value for "host.name" if configured.  Otherwise, it will use the
> > value
> > >> returned from
> > >> # java.net.InetAddress.getCanonicalHostName().
> > >> #advertised.host.name=<hostname routable by clients>
> > >>
> > >> # The port to publish to ZooKeeper for clients to use. If this is not
> > set,
> > >> # it will publish the same port that the broker binds to.
> > >> #advertised.port=<port accessible by clients>
> > >>
> > >> /*******************************************
> > >>  Joe Stein
> > >>  Founder, Principal Consultant
> > >>  Big Data Open Source Security LLC
> > >>  http://www.stealth.ly
> > >>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > >> ********************************************/
> > >>
> > >>
> > >> On Wed, Mar 19, 2014 at 6:39 PM, Sotaro Kimura <rf...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi, all.
> > >> >
> > >> > I'm trying Log produce to Remote Kafka Cluster over SSH Tunneling.
> > >> >
> > >> > Configuration is below.
> > >> > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> > >> > Firewall allows only ssh port(22).
> > >> > ex) 192.168.100.100
> > >> > - Kafka producer exists out of DataCenter.
> > >> > ex) 172.16.0.100
> > >> >
> > >> > I tried produce over SSH Tunneling.
> > >> > Producer successed getting meta data from brokers.
> > >> > But meta data is Address which producer can't access directry.
> > >> >
> > >> > Detail is below.
> > >> > 1. Ssh connect producer machine to DataCenter.
> > >> > 172.16.0.100 > 192.168.100.100:22
> > >> > 2. Create SSH Tunnel over ssh connection.
> > >> > 172.16.0.100's localhost:19092 to 192.168.100.100:9092(Kafka
> receive
> > >> > port).
> > >> > 3. Producer connect to localhost:19092.
> > >> > Producer get metadata from brokers(192.168.100.100:9092).
> > >> > 4. Producer failed to connect brokers(192.168.100.100:9092).
> > >> >
> > >> > In such situations, how I can produce to Remote Kafka Cluster?
> > >> >
> > >> > --
> > >> > #################################
> > >> > 木村 宗太郎 Sotaro Kimura
> > >> > <rf...@gmail.com>
> > >> > #################################
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > #################################
> > > 木村 宗太郎 Sotaro Kimura
> > > <rf...@gmail.com>
> > > #################################
> > >
> >
> >
> >
> > --
> > #################################
> > 木村 宗太郎 Sotaro Kimura
> > <rf...@gmail.com>
> > #################################
> >
>



-- 
#################################
木村 宗太郎 Sotaro Kimura
<rf...@gmail.com>
#################################

Re: How produce to remote Kafka Cluster over ssh tunneling

Posted by Jun Rao <ju...@gmail.com>.
Any error in the broker, controller and state change log?

Thanks,

Jun


On Tue, Apr 8, 2014 at 3:08 PM, Sotaro Kimura <rf...@gmail.com> wrote:

> Hi, all.
>
> Sorry for my late reply.
>
> I tried advertised.host.name/advertised.port.
> But producer failed fetching metadata from kafka broker.
>
> Configuration is below.
> - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> Firewall allows only ssh port(22).
> ex) 192.168.100.100
> - Kafka producer exists out of DataCenter.
> ex) 172.16.0.100
>
> Broker additional setting is below.
> advertised.host.name=localhost
> advertised.port=19092
>
> Kafka producer operation is below.
>
> - SSH port fowarding
> ========================
> # ssh root@192.168.100.100 -L 19092:192.168.100.100:9092 -L 12181:
> 192.168.100.100:2181
> // Kafka broker and zookeeper ssh port fowarding.
> ========================
>
> - Producer data put(failed)
> ========================
> # ./kafka-topics.sh --create --zookeeper localhost:12181
> --replication-factor 1 --partitions 3 --topic TestTopic
> Created topic "TestTopic".
> // Succeed create topic over ssh port fowarding.
> # ./kafka-topics.sh --list --zookeeper localhost:12181
> TestTopic
> // Succeed get topic list over ssh port fowarding.
> # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> TestTopic
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> Test
> [2014-04-06 20:42:01,352] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,364] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: TestTopic
> (kafka.producer.async.DefaultEventHandler)
> [2014-04-06 20:42:01,476] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,481] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: TestTopic
> (kafka.producer.async.DefaultEventHandler)
> [2014-04-06 20:42:01,589] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,595] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: TestTopic
> (kafka.producer.async.DefaultEventHandler)
> [2014-04-06 20:42:01,709] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartit
> ionInfo)
> [2014-04-06 20:42:01,719] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: TestTopic
> (kafka.producer.async.DefaultEventHandler)
> [2014-04-06 20:42:01,826] WARN Error while fetching metadata
> [{TopicMetadata for topic TestTopic ->No partition metadata for topic
> TestTopic due to kafka.common.LeaderNotAvailableException}] for topic
> [TestTopic]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-04-06 20:42:01,829] ERROR Failed to send requests for topics
> TestTopic with correlation ids in [0,8]
> (kafka.producer.async.DefaultEventHandler)
> [2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> ========================
>
>
> If advertised setting is not defined,
> producer connection is refused.(below error)
> So I think advertised setting is working.
>
> ========================
> # ./kafka-console-producer.sh --broker-list localhost:19092 --topic
> TestTopic
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> Test
> [2014-04-08 21:11:44,231] ERROR Producer connection to localhost:9092
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> ========================
>
> Is there anyone you find the cause of error.
>
> regards.
>
>
>
>
> 2014-03-20 8:00 GMT+09:00 Sotaro Kimura <rf...@gmail.com>:
>
> > Thanks for quick reply.
> >
> >
> > advertised.host.name/advertised.port setting is this.
> > I confirmed only producer setting, so I couldn't find these properties.
> >
> > I will try it. Thank you.
> >
> >
> > 2014-03-20 7:50 GMT+09:00 Joe Stein <jo...@stealth.ly>:
> >
> > With 0.8.1 update these two properties in the server.properties config of
> >> the broker to be 172.16.0.100 and 19092 respectively
> >>
> >> # Hostname the broker will advertise to producers and consumers. If not
> >> set, it uses the
> >> # value for "host.name" if configured.  Otherwise, it will use the
> value
> >> returned from
> >> # java.net.InetAddress.getCanonicalHostName().
> >> #advertised.host.name=<hostname routable by clients>
> >>
> >> # The port to publish to ZooKeeper for clients to use. If this is not
> set,
> >> # it will publish the same port that the broker binds to.
> >> #advertised.port=<port accessible by clients>
> >>
> >> /*******************************************
> >>  Joe Stein
> >>  Founder, Principal Consultant
> >>  Big Data Open Source Security LLC
> >>  http://www.stealth.ly
> >>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> >> ********************************************/
> >>
> >>
> >> On Wed, Mar 19, 2014 at 6:39 PM, Sotaro Kimura <rf...@gmail.com>
> >> wrote:
> >>
> >> > Hi, all.
> >> >
> >> > I'm trying Log produce to Remote Kafka Cluster over SSH Tunneling.
> >> >
> >> > Configuration is below.
> >> > - Kafka cluster in DataCenter(each machine has Grobal IP Address).
> >> > Firewall allows only ssh port(22).
> >> > ex) 192.168.100.100
> >> > - Kafka producer exists out of DataCenter.
> >> > ex) 172.16.0.100
> >> >
> >> > I tried produce over SSH Tunneling.
> >> > Producer successed getting meta data from brokers.
> >> > But meta data is Address which producer can't access directry.
> >> >
> >> > Detail is below.
> >> > 1. Ssh connect producer machine to DataCenter.
> >> > 172.16.0.100 > 192.168.100.100:22
> >> > 2. Create SSH Tunnel over ssh connection.
> >> > 172.16.0.100's localhost:19092 to 192.168.100.100:9092(Kafka receive
> >> > port).
> >> > 3. Producer connect to localhost:19092.
> >> > Producer get metadata from brokers(192.168.100.100:9092).
> >> > 4. Producer failed to connect brokers(192.168.100.100:9092).
> >> >
> >> > In such situations, how I can produce to Remote Kafka Cluster?
> >> >
> >> > --
> >> > #################################
> >> > 木村 宗太郎 Sotaro Kimura
> >> > <rf...@gmail.com>
> >> > #################################
> >> >
> >>
> >
> >
> >
> > --
> > #################################
> > 木村 宗太郎 Sotaro Kimura
> > <rf...@gmail.com>
> > #################################
> >
>
>
>
> --
> #################################
> 木村 宗太郎 Sotaro Kimura
> <rf...@gmail.com>
> #################################
>