You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/08/13 02:03:17 UTC
kafka.trident.ZkBrokerReader-brokers need refreshing
Hi, All
I am reading the messages from producer and print the "time" and
"userhostaddress", but I am getting such warning once in a while:
184.146.220.124
1403070062
24.79.224.172
1403070063
71.199.4.138
2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No data
found in Kafka Partition partition_2
1403070064
172.4.221.83
2647191 [Thread-16-spout0] INFO storm.artemis.kafka.trident.ZkBrokerReader
- brokers need refreshing because 60000ms have expired
2647195 [Thread-16-spout0] INFO storm.artemis.kafka.DynamicBrokersReader -
Read partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
10.100.70.128:9092}}
2648569 [Thread-8-$spoutcoord-spout0] INFO
storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
because 60000ms have expired
2648573 [Thread-8-$spoutcoord-spout0] INFO
storm.artemis.kafka.DynamicBrokersReader - Read partition info from
zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
10.100.70.128:9092}}
1403070068
24.85.157.225
1403070070
24.114.78.75
1403070070
76.219.198.176
1403070071
142.166.228.205
1403070071
76.66.155.166
1403070071
172.56.10.86
1403070071
.
.
.
It says brokers need refreshing because 60000ms have expired, I didn't see
any 60000ms being configured anywhere, I wonder what this issue is. In
addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
topic: topictest partition: 0 leader: 1 replicas: 1,3,2
isr: 1,2,3
topic: topictest partition: 1 leader: 1 replicas: 2,1,3
isr: 1,2,3
topic: topictest partition: 2 leader: 1 replicas: 3,2,1
isr: 1,2,3
topic: topictest partition: 3 leader: 1 replicas: 1,2,3
isr: 1,2,3
topic: topictest partition: 4 leader: 1 replicas: 2,3,1
isr: 1,2,3
What I don't understand is that I couldn't see BrokerReader for the brokers
9093 and 9094. And also what that means "No data found in Kafka Partition
partition_2".
thanks
Alec
Re: kafka.trident.ZkBrokerReader-brokers need refreshing
Posted by Sa Li <sa...@gmail.com>.
Is that 60000ms internal setting? where can I see this configuration?
thanks
Alec
On Wed, Aug 13, 2014 at 9:44 AM, Sa Li <sa...@gmail.com> wrote:
> Hi, Siddharth
>
> I only used trident topology for kafka spout, since I thought I can easily
> to add .each function to parse the stream. I do add two types of
> storm-kafka packages in my pom, See this pom
>
> <!-- Storm-Kafka compiled -->
> <dependency>
> <artifactId>storm-kafka</artifactId>
> <groupId>org.apache.storm</groupId>
> <version>0.9.2-incubating</version>
> <!--
> <scope>*compile*</scope>
> -->
> <!-- exclude the zookeeper package from storm-Kafka -->
> <exclusions>
> <exclusion>
> <groupId>org.apache.zookeeper</groupId>
> <artifactId>zookeeper</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> <dependency>
> <groupId>storm</groupId>
> <artifactId>storm-kafka</artifactId>
> <version>0.9.0-wip16a-scala292</version>
> <!-- exclude the zookeeper package from
> storm-Kafka -->
> <exclusions>
> <exclusion>
> <groupId>org.apache.zookeeper</groupId>
> <artifactId>zookeeper</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> When I firstly running the kafkaSpout, I got stuck for a while, after
> talking to developers here back and forth, I realize the version conflict
> is really an issue we need to pay attention, you must be make the
> zookeeper, storm, kafka version consistent, otherwise you will have
> problem, or you need to exclude it in pom.
>
> Thanks
>
> Alec
>
>
> On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <
> siddharth.ubale@gmail.com> wrote:
>
>> hi ,
>>
>> Just curious, did u face any isue with using kafka Spout if u did not use
>> trident?
>> Are u also able to implement the KafkaSpout packaged with Storm ?
>> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
>> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
>> Can you lemme know about any issue u faced??
>>
>> i get no error while i submitting my program but only a never ending
>> sequence of the follwing:
>> 68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
>> 68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting:
>> words __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
>> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency
>> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
>> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
>> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
>> [__emit-count = {}]>]]
>> 68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished
>> refreshing
>> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished
>> refreshing
>>
>> Thanks,
>> Siddharth
>>
>>
>>
>> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, All
>>>
>>> I am reading the messages from producer and print the "time" and
>>> "userhostaddress", but I am getting such warning once in a while:
>>>
>>> 184.146.220.124
>>> 1403070062
>>> 24.79.224.172
>>> 1403070063
>>> 71.199.4.138
>>> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No
>>> data found in Kafka Partition partition_2
>>> 1403070064
>>> 172.4.221.83
>>> 2647191 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2647195 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 1403070068
>>> 24.85.157.225
>>> 1403070070
>>> 24.114.78.75
>>> 1403070070
>>> 76.219.198.176
>>> 1403070071
>>> 142.166.228.205
>>> 1403070071
>>> 76.66.155.166
>>> 1403070071
>>> 172.56.10.86
>>> 1403070071
>>>
>>> .
>>> .
>>> .
>>> It says brokers need refreshing because 60000ms have expired, I didn't
>>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>>
>>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2
>>> isr: 1,2,3
>>> topic: topictest partition: 1 leader: 1 replicas: 2,1,3
>>> isr: 1,2,3
>>> topic: topictest partition: 2 leader: 1 replicas: 3,2,1
>>> isr: 1,2,3
>>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3
>>> isr: 1,2,3
>>> topic: topictest partition: 4 leader: 1 replicas: 2,3,1
>>> isr: 1,2,3
>>>
>>> What I don't understand is that I couldn't see BrokerReader for the
>>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>>> Partition partition_2".
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>
>
Re: kafka.trident.ZkBrokerReader-brokers need refreshing
Posted by Sa Li <sa...@gmail.com>.
Thanks, Parth, that makes sense.
On Wed, Aug 13, 2014 at 12:57 PM, Parth Brahmbhatt <
pbrahmbhatt@hortonworks.com> wrote:
> 60000 ms is an internal setting for zk refresh see
> https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java#L30
> .
>
> Thanks
> Parth
>
> On Aug 13, 2014, at 9:44 AM, Sa Li <sa...@gmail.com> wrote:
>
> Hi, Siddharth
>
> I only used trident topology for kafka spout, since I thought I can easily
> to add .each function to parse the stream. I do add two types of
> storm-kafka packages in my pom, See this pom
>
> <!-- Storm-Kafka compiled -->
> <dependency>
> <artifactId>storm-kafka</artifactId>
> <groupId>org.apache.storm</groupId>
> <version>0.9.2-incubating</version>
> <!--
> <scope>*compile*</scope>
> -->
> <!-- exclude the zookeeper package from storm-Kafka -->
> <exclusions>
> <exclusion>
> <groupId>org.apache.zookeeper</groupId>
> <artifactId>zookeeper</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> <dependency>
> <groupId>storm</groupId>
> <artifactId>storm-kafka</artifactId>
> <version>0.9.0-wip16a-scala292</version>
> <!-- exclude the zookeeper package from
> storm-Kafka -->
> <exclusions>
> <exclusion>
> <groupId>org.apache.zookeeper</groupId>
> <artifactId>zookeeper</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> When I firstly running the kafkaSpout, I got stuck for a while, after
> talking to developers here back and forth, I realize the version conflict
> is really an issue we need to pay attention, you must be make the
> zookeeper, storm, kafka version consistent, otherwise you will have
> problem, or you need to exclude it in pom.
>
> Thanks
>
> Alec
>
>
> On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <
> siddharth.ubale@gmail.com> wrote:
>
>> hi ,
>>
>> Just curious, did u face any isue with using kafka Spout if u did not use
>> trident?
>> Are u also able to implement the KafkaSpout packaged with Storm ?
>> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
>> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
>> Can you lemme know about any issue u faced??
>>
>> i get no error while i submitting my program but only a never ending
>> sequence of the follwing:
>> 68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
>> 68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting:
>> words __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
>> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency
>> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
>> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
>> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
>> [__emit-count = {}]>]]
>> 68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished
>> refreshing
>> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished
>> refreshing
>>
>> Thanks,
>> Siddharth
>>
>>
>>
>> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, All
>>>
>>> I am reading the messages from producer and print the "time" and
>>> "userhostaddress", but I am getting such warning once in a while:
>>>
>>> 184.146.220.124
>>> 1403070062
>>> 24.79.224.172
>>> 1403070063
>>> 71.199.4.138
>>> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No
>>> data found in Kafka Partition partition_2
>>> 1403070064
>>> 172.4.221.83
>>> 2647191 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2647195 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 1403070068
>>> 24.85.157.225
>>> 1403070070
>>> 24.114.78.75
>>> 1403070070
>>> 76.219.198.176
>>> 1403070071
>>> 142.166.228.205
>>> 1403070071
>>> 76.66.155.166
>>> 1403070071
>>> 172.56.10.86
>>> 1403070071
>>>
>>> .
>>> .
>>> .
>>> It says brokers need refreshing because 60000ms have expired, I didn't
>>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>>
>>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2
>>> isr: 1,2,3
>>> topic: topictest partition: 1 leader: 1 replicas: 2,1,3
>>> isr: 1,2,3
>>> topic: topictest partition: 2 leader: 1 replicas: 3,2,1
>>> isr: 1,2,3
>>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3
>>> isr: 1,2,3
>>> topic: topictest partition: 4 leader: 1 replicas: 2,3,1
>>> isr: 1,2,3
>>>
>>> What I don't understand is that I couldn't see BrokerReader for the
>>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>>> Partition partition_2".
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
Re: kafka.trident.ZkBrokerReader-brokers need refreshing
Posted by Parth Brahmbhatt <pb...@hortonworks.com>.
60000 ms is an internal setting for zk refresh see https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java#L30.
Thanks
Parth
On Aug 13, 2014, at 9:44 AM, Sa Li <sa...@gmail.com> wrote:
> Hi, Siddharth
>
> I only used trident topology for kafka spout, since I thought I can easily to add .each function to parse the stream. I do add two types of storm-kafka packages in my pom, See this pom
>
> <!-- Storm-Kafka compiled -->
> <dependency>
> <artifactId>storm-kafka</artifactId>
> <groupId>org.apache.storm</groupId>
> <version>0.9.2-incubating</version>
> <!--
> <scope>*compile*</scope>
> -->
> <!-- exclude the zookeeper package from storm-Kafka -->
> <exclusions>
> <exclusion>
> <groupId>org.apache.zookeeper</groupId>
> <artifactId>zookeeper</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> <dependency>
> <groupId>storm</groupId>
> <artifactId>storm-kafka</artifactId>
> <version>0.9.0-wip16a-scala292</version>
> <!-- exclude the zookeeper package from storm-Kafka -->
> <exclusions>
> <exclusion>
> <groupId>org.apache.zookeeper</groupId>
> <artifactId>zookeeper</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> When I firstly running the kafkaSpout, I got stuck for a while, after talking to developers here back and forth, I realize the version conflict is really an issue we need to pay attention, you must be make the zookeeper, storm, kafka version consistent, otherwise you will have problem, or you need to exclude it in pom.
>
> Thanks
>
> Alec
>
>
> On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <si...@gmail.com> wrote:
> hi ,
>
> Just curious, did u face any isue with using kafka Spout if u did not use trident?
> Are u also able to implement the KafkaSpout packaged with Storm ?
> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
> Can you lemme know about any issue u faced??
>
> i get no error while i submitting my program but only a never ending sequence of the follwing:
> 68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting: words __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024, population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0, totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>]]
> 68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections
> 68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections
> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted partition managers: []
> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition managers: []
> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished refreshing
> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted partition managers: []
> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition managers: []
> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished refreshing
>
> Thanks,
> Siddharth
>
>
>
> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
> Hi, All
>
> I am reading the messages from producer and print the "time" and "userhostaddress", but I am getting such warning once in a while:
>
> 184.146.220.124
> 1403070062
> 24.79.224.172
> 1403070063
> 71.199.4.138
> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No data found in Kafka Partition partition_2
> 1403070064
> 172.4.221.83
> 2647191 [Thread-16-spout0] INFO storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing because 60000ms have expired
> 2647195 [Thread-16-spout0] INFO storm.artemis.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=10.100.70.128:9092}}
> 2648569 [Thread-8-$spoutcoord-spout0] INFO storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing because 60000ms have expired
> 2648573 [Thread-8-$spoutcoord-spout0] INFO storm.artemis.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=10.100.70.128:9092}}
> 1403070068
> 24.85.157.225
> 1403070070
> 24.114.78.75
> 1403070070
> 76.219.198.176
> 1403070071
> 142.166.228.205
> 1403070071
> 76.66.155.166
> 1403070071
> 172.56.10.86
> 1403070071
>
> .
> .
> .
> It says brokers need refreshing because 60000ms have expired, I didn't see any 60000ms being configured anywhere, I wonder what this issue is. In addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>
> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 isr: 1,2,3
> topic: topictest partition: 1 leader: 1 replicas: 2,1,3 isr: 1,2,3
> topic: topictest partition: 2 leader: 1 replicas: 3,2,1 isr: 1,2,3
> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 isr: 1,2,3
> topic: topictest partition: 4 leader: 1 replicas: 2,3,1 isr: 1,2,3
>
> What I don't understand is that I couldn't see BrokerReader for the brokers 9093 and 9094. And also what that means "No data found in Kafka Partition partition_2".
>
>
> thanks
>
> Alec
>
>
>
--
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to
which it is addressed and may contain information that is confidential,
privileged and exempt from disclosure under applicable law. If the reader
of this message is not the intended recipient, you are hereby notified that
any printing, copying, dissemination, distribution, disclosure or
forwarding of this communication is strictly prohibited. If you have
received this communication in error, please contact the sender immediately
and delete it from your system. Thank You.
Re: kafka.trident.ZkBrokerReader-brokers need refreshing
Posted by Sa Li <sa...@gmail.com>.
Hi, Siddharth
I only used trident topology for kafka spout, since I thought I can easily
to add .each function to parse the stream. I do add two types of
storm-kafka packages in my pom, See this pom
<!-- Storm-Kafka compiled -->
<dependency>
<artifactId>storm-kafka</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.2-incubating</version>
<!--
<scope>*compile*</scope>
-->
<!-- exclude the zookeeper package from storm-Kafka -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.0-wip16a-scala292</version>
<!-- exclude the zookeeper package from storm-Kafka
-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
When I firstly running the kafkaSpout, I got stuck for a while, after
talking to developers here back and forth, I realize the version conflict
is really an issue we need to pay attention, you must be make the
zookeeper, storm, kafka version consistent, otherwise you will have
problem, or you need to exclude it in pom.
Thanks
Alec
On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <si...@gmail.com>
wrote:
> hi ,
>
> Just curious, did u face any isue with using kafka Spout if u did not use
> trident?
> Are u also able to implement the KafkaSpout packaged with Storm ?
> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
> Can you lemme know about any issue u faced??
>
> i get no error while i submitting my program but only a never ending
> sequence of the follwing:
> 68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting: words
> __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint
> [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
> capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]>]]
> 68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition
> managers: []
> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished
> refreshing
> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition
> managers: []
> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished
> refreshing
>
> Thanks,
> Siddharth
>
>
>
> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
>
>> Hi, All
>>
>> I am reading the messages from producer and print the "time" and
>> "userhostaddress", but I am getting such warning once in a while:
>>
>> 184.146.220.124
>> 1403070062
>> 24.79.224.172
>> 1403070063
>> 71.199.4.138
>> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No data
>> found in Kafka Partition partition_2
>> 1403070064
>> 172.4.221.83
>> 2647191 [Thread-16-spout0] INFO
>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 2647195 [Thread-16-spout0] INFO storm.artemis.kafka.DynamicBrokersReader
>> - Read partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
>> 10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>> 10.100.70.128:9092}}
>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>> 10.100.70.128:9092}}
>> 1403070068
>> 24.85.157.225
>> 1403070070
>> 24.114.78.75
>> 1403070070
>> 76.219.198.176
>> 1403070071
>> 142.166.228.205
>> 1403070071
>> 76.66.155.166
>> 1403070071
>> 172.56.10.86
>> 1403070071
>>
>> .
>> .
>> .
>> It says brokers need refreshing because 60000ms have expired, I didn't
>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>
>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2
>> isr: 1,2,3
>> topic: topictest partition: 1 leader: 1 replicas: 2,1,3
>> isr: 1,2,3
>> topic: topictest partition: 2 leader: 1 replicas: 3,2,1
>> isr: 1,2,3
>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3
>> isr: 1,2,3
>> topic: topictest partition: 4 leader: 1 replicas: 2,3,1
>> isr: 1,2,3
>>
>> What I don't understand is that I couldn't see BrokerReader for the
>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>> Partition partition_2".
>>
>>
>> thanks
>>
>> Alec
>>
>>
>
Re: kafka.trident.ZkBrokerReader-brokers need refreshing
Posted by siddharth ubale <si...@gmail.com>.
hi ,
Just curious, did u face any isue with using kafka Spout if u did not use
trident?
Are u also able to implement the KafkaSpout packaged with Storm ?
I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read from
kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
Can you lemme know about any issue u faced??
i get no error while i submitting my program but only a never ending
sequence of the follwing:
68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting: words
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
#<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
[__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
[__emit-count = {}]>]]
68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted partition
managers: []
68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition
managers: []
68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished
refreshing
68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted partition
managers: []
68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition
managers: []
68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished
refreshing
Thanks,
Siddharth
On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
> Hi, All
>
> I am reading the messages from producer and print the "time" and
> "userhostaddress", but I am getting such warning once in a while:
>
> 184.146.220.124
> 1403070062
> 24.79.224.172
> 1403070063
> 71.199.4.138
> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No data
> found in Kafka Partition partition_2
> 1403070064
> 172.4.221.83
> 2647191 [Thread-16-spout0] INFO
> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
> because 60000ms have expired
> 2647195 [Thread-16-spout0] INFO storm.artemis.kafka.DynamicBrokersReader
> - Read partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
> 10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
> 10.100.70.128:9092}}
> 2648569 [Thread-8-$spoutcoord-spout0] INFO
> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
> because 60000ms have expired
> 2648573 [Thread-8-$spoutcoord-spout0] INFO
> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
> 10.100.70.128:9092}}
> 1403070068
> 24.85.157.225
> 1403070070
> 24.114.78.75
> 1403070070
> 76.219.198.176
> 1403070071
> 142.166.228.205
> 1403070071
> 76.66.155.166
> 1403070071
> 172.56.10.86
> 1403070071
>
> .
> .
> .
> It says brokers need refreshing because 60000ms have expired, I didn't see
> any 60000ms being configured anywhere, I wonder what this issue is. In
> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>
> topic: topictest partition: 0 leader: 1 replicas: 1,3,2
> isr: 1,2,3
> topic: topictest partition: 1 leader: 1 replicas: 2,1,3
> isr: 1,2,3
> topic: topictest partition: 2 leader: 1 replicas: 3,2,1
> isr: 1,2,3
> topic: topictest partition: 3 leader: 1 replicas: 1,2,3
> isr: 1,2,3
> topic: topictest partition: 4 leader: 1 replicas: 2,3,1
> isr: 1,2,3
>
> What I don't understand is that I couldn't see BrokerReader for the
> brokers 9093 and 9094. And also what that means "No data found in Kafka
> Partition partition_2".
>
>
> thanks
>
> Alec
>
>