You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/08/13 02:03:17 UTC

kafka.trident.ZkBrokerReader-brokers need refreshing

Hi, All

I am reading the messages from producer and print the "time" and
"userhostaddress", but I am getting such warning once in a while:

184.146.220.124
1403070062
24.79.224.172
1403070063
71.199.4.138
2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No data
found in Kafka Partition partition_2
1403070064
172.4.221.83
2647191 [Thread-16-spout0] INFO  storm.artemis.kafka.trident.ZkBrokerReader
- brokers need refreshing because 60000ms have expired
2647195 [Thread-16-spout0] INFO  storm.artemis.kafka.DynamicBrokersReader -
Read partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
10.100.70.128:9092}}
2648569 [Thread-8-$spoutcoord-spout0] INFO
storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
because 60000ms have expired
2648573 [Thread-8-$spoutcoord-spout0] INFO
storm.artemis.kafka.DynamicBrokersReader - Read partition info from
zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
10.100.70.128:9092}}
1403070068
24.85.157.225
1403070070
24.114.78.75
1403070070
76.219.198.176
1403070071
142.166.228.205
1403070071
76.66.155.166
1403070071
172.56.10.86
1403070071

.
.
.
It says brokers need refreshing because 60000ms have expired, I didn't see
any 60000ms being configured anywhere, I wonder what this issue is. In
addition, I started 3 kafka brokers, and 5 partitions for the "topictest",

topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
isr: 1,2,3
topic: topictest        partition: 1    leader: 1       replicas: 2,1,3
isr: 1,2,3
topic: topictest        partition: 2    leader: 1       replicas: 3,2,1
isr: 1,2,3
topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
isr: 1,2,3
topic: topictest        partition: 4    leader: 1       replicas: 2,3,1
isr: 1,2,3

What I don't understand is that I couldn't see BrokerReader for the brokers
9093 and 9094. And also what that means "No data found in Kafka Partition
partition_2".


thanks

Alec

Re: kafka.trident.ZkBrokerReader-brokers need refreshing

Posted by Sa Li <sa...@gmail.com>.
Is that 60000ms internal setting? where can I see this configuration?

thanks

Alec


On Wed, Aug 13, 2014 at 9:44 AM, Sa Li <sa...@gmail.com> wrote:

> Hi, Siddharth
>
> I only used trident topology for kafka spout, since I thought I can easily
> to add .each function to parse the stream. I do add two types of
> storm-kafka packages in my pom, See this pom
>
>      <!-- Storm-Kafka compiled -->
>     <dependency>
>             <artifactId>storm-kafka</artifactId>
>             <groupId>org.apache.storm</groupId>
>             <version>0.9.2-incubating</version>
>   <!--
>             <scope>*compile*</scope>
>   -->
>                    <!-- exclude the zookeeper package from storm-Kafka -->
>                 <exclusions>
>                    <exclusion>
>                        <groupId>org.apache.zookeeper</groupId>
>                        <artifactId>zookeeper</artifactId>
>                    </exclusion>
>                 </exclusions>
>     </dependency>
>
>      <dependency>
>             <groupId>storm</groupId>
>             <artifactId>storm-kafka</artifactId>
>             <version>0.9.0-wip16a-scala292</version>
>                         <!-- exclude the zookeeper package from
> storm-Kafka -->
>                 <exclusions>
>                    <exclusion>
>                        <groupId>org.apache.zookeeper</groupId>
>                        <artifactId>zookeeper</artifactId>
>                    </exclusion>
>                 </exclusions>
>      </dependency>
>
> When I firstly running the kafkaSpout, I got stuck for a while, after
> talking to developers here back and forth, I realize the version conflict
> is really an issue we need to pay attention, you must be make the
> zookeeper, storm, kafka version consistent, otherwise you will have
> problem, or you need to exclude it in pom.
>
> Thanks
>
> Alec
>
>
> On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <
> siddharth.ubale@gmail.com> wrote:
>
>> hi ,
>>
>> Just curious, did u face any isue with using kafka Spout if u did not use
>> trident?
>> Are u also able to implement the KafkaSpout packaged with Storm ?
>> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
>> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
>> Can you lemme know about any issue u faced??
>>
>> i get no error while i submitting my program but only a never ending
>> sequence of the follwing:
>> 68582 [Thread-33-words] INFO  backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
>> 68588 [Thread-33-words] INFO  backtype.storm.daemon.task - Emitting:
>> words __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
>> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency
>> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
>> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
>> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
>> [__emit-count = {}]>]]
>> 68649 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68649 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Finished
>> refreshing
>> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Finished
>> refreshing
>>
>> Thanks,
>> Siddharth
>>
>>
>>
>> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, All
>>>
>>> I am reading the messages from producer and print the "time" and
>>> "userhostaddress", but I am getting such warning once in a while:
>>>
>>> 184.146.220.124
>>> 1403070062
>>> 24.79.224.172
>>> 1403070063
>>> 71.199.4.138
>>> 2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No
>>> data found in Kafka Partition partition_2
>>> 1403070064
>>> 172.4.221.83
>>> 2647191 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2647195 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 1403070068
>>> 24.85.157.225
>>> 1403070070
>>> 24.114.78.75
>>> 1403070070
>>> 76.219.198.176
>>> 1403070071
>>> 142.166.228.205
>>> 1403070071
>>> 76.66.155.166
>>> 1403070071
>>> 172.56.10.86
>>> 1403070071
>>>
>>> .
>>> .
>>> .
>>> It says brokers need refreshing because 60000ms have expired, I didn't
>>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>>
>>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
>>> isr: 1,2,3
>>> topic: topictest        partition: 1    leader: 1       replicas: 2,1,3
>>> isr: 1,2,3
>>> topic: topictest        partition: 2    leader: 1       replicas: 3,2,1
>>> isr: 1,2,3
>>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
>>> isr: 1,2,3
>>> topic: topictest        partition: 4    leader: 1       replicas: 2,3,1
>>> isr: 1,2,3
>>>
>>> What I don't understand is that I couldn't see BrokerReader for the
>>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>>> Partition partition_2".
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>
>

Re: kafka.trident.ZkBrokerReader-brokers need refreshing

Posted by Sa Li <sa...@gmail.com>.
Thanks, Parth, that makes sense.


On Wed, Aug 13, 2014 at 12:57 PM, Parth Brahmbhatt <
pbrahmbhatt@hortonworks.com> wrote:

> 60000 ms is an internal setting for zk refresh see
> https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java#L30
> .
>
> Thanks
> Parth
>
> On Aug 13, 2014, at 9:44 AM, Sa Li <sa...@gmail.com> wrote:
>
> Hi, Siddharth
>
> I only used trident topology for kafka spout, since I thought I can easily
> to add .each function to parse the stream. I do add two types of
> storm-kafka packages in my pom, See this pom
>
>      <!-- Storm-Kafka compiled -->
>     <dependency>
>             <artifactId>storm-kafka</artifactId>
>             <groupId>org.apache.storm</groupId>
>             <version>0.9.2-incubating</version>
>   <!--
>             <scope>*compile*</scope>
>   -->
>                    <!-- exclude the zookeeper package from storm-Kafka -->
>                 <exclusions>
>                    <exclusion>
>                        <groupId>org.apache.zookeeper</groupId>
>                        <artifactId>zookeeper</artifactId>
>                    </exclusion>
>                 </exclusions>
>     </dependency>
>
>      <dependency>
>             <groupId>storm</groupId>
>             <artifactId>storm-kafka</artifactId>
>             <version>0.9.0-wip16a-scala292</version>
>                         <!-- exclude the zookeeper package from
> storm-Kafka -->
>                 <exclusions>
>                    <exclusion>
>                        <groupId>org.apache.zookeeper</groupId>
>                        <artifactId>zookeeper</artifactId>
>                    </exclusion>
>                 </exclusions>
>      </dependency>
>
> When I firstly running the kafkaSpout, I got stuck for a while, after
> talking to developers here back and forth, I realize the version conflict
> is really an issue we need to pay attention, you must be make the
> zookeeper, storm, kafka version consistent, otherwise you will have
> problem, or you need to exclude it in pom.
>
> Thanks
>
> Alec
>
>
> On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <
> siddharth.ubale@gmail.com> wrote:
>
>> hi ,
>>
>> Just curious, did u face any isue with using kafka Spout if u did not use
>> trident?
>> Are u also able to implement the KafkaSpout packaged with Storm ?
>> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
>> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
>> Can you lemme know about any issue u faced??
>>
>> i get no error while i submitting my program but only a never ending
>> sequence of the follwing:
>> 68582 [Thread-33-words] INFO  backtype.storm.daemon.executor - Processing
>> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
>> 68588 [Thread-33-words] INFO  backtype.storm.daemon.task - Emitting:
>> words __metrics [#<TaskInfo
>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9>
>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1,
>> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency
>> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
>> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
>> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
>> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
>> [__emit-count = {}]>]]
>> 68649 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68649 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Refreshing
>> partition manager connections
>> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Finished
>> refreshing
>> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Deleted
>> partition managers: []
>> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - New partition
>> managers: []
>> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Finished
>> refreshing
>>
>> Thanks,
>> Siddharth
>>
>>
>>
>> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, All
>>>
>>> I am reading the messages from producer and print the "time" and
>>> "userhostaddress", but I am getting such warning once in a while:
>>>
>>> 184.146.220.124
>>> 1403070062
>>> 24.79.224.172
>>> 1403070063
>>> 71.199.4.138
>>> 2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No
>>> data found in Kafka Partition partition_2
>>> 1403070064
>>> 172.4.221.83
>>> 2647191 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2647195 [Thread-16-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>>> 10.100.70.128:9092}}
>>> 1403070068
>>> 24.85.157.225
>>> 1403070070
>>> 24.114.78.75
>>> 1403070070
>>> 76.219.198.176
>>> 1403070071
>>> 142.166.228.205
>>> 1403070071
>>> 76.66.155.166
>>> 1403070071
>>> 172.56.10.86
>>> 1403070071
>>>
>>> .
>>> .
>>> .
>>> It says brokers need refreshing because 60000ms have expired, I didn't
>>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>>
>>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
>>> isr: 1,2,3
>>> topic: topictest        partition: 1    leader: 1       replicas: 2,1,3
>>> isr: 1,2,3
>>> topic: topictest        partition: 2    leader: 1       replicas: 3,2,1
>>> isr: 1,2,3
>>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
>>> isr: 1,2,3
>>> topic: topictest        partition: 4    leader: 1       replicas: 2,3,1
>>> isr: 1,2,3
>>>
>>> What I don't understand is that I couldn't see BrokerReader for the
>>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>>> Partition partition_2".
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.

Re: kafka.trident.ZkBrokerReader-brokers need refreshing

Posted by Parth Brahmbhatt <pb...@hortonworks.com>.
60000 ms is an internal setting for zk refresh see https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java#L30.

Thanks
Parth

On Aug 13, 2014, at 9:44 AM, Sa Li <sa...@gmail.com> wrote:

> Hi, Siddharth
> 
> I only used trident topology for kafka spout, since I thought I can easily to add .each function to parse the stream. I do add two types of storm-kafka packages in my pom, See this pom
> 
>      <!-- Storm-Kafka compiled -->
>     <dependency>
>             <artifactId>storm-kafka</artifactId>
>             <groupId>org.apache.storm</groupId>
>             <version>0.9.2-incubating</version>
>   <!--
>             <scope>*compile*</scope>
>   -->
>                    <!-- exclude the zookeeper package from storm-Kafka -->
>                 <exclusions>
>                    <exclusion>
>                        <groupId>org.apache.zookeeper</groupId>
>                        <artifactId>zookeeper</artifactId>
>                    </exclusion>
>                 </exclusions>
>     </dependency>
> 
>      <dependency>
>             <groupId>storm</groupId>
>             <artifactId>storm-kafka</artifactId>
>             <version>0.9.0-wip16a-scala292</version>
>                         <!-- exclude the zookeeper package from storm-Kafka -->
>                 <exclusions>
>                    <exclusion>
>                        <groupId>org.apache.zookeeper</groupId>
>                        <artifactId>zookeeper</artifactId>
>                    </exclusion>
>                 </exclusions>
>      </dependency>
> 
> When I firstly running the kafkaSpout, I got stuck for a while, after talking to developers here back and forth, I realize the version conflict is really an issue we need to pay attention, you must be make the zookeeper, storm, kafka version consistent, otherwise you will have problem, or you need to exclude it in pom.
> 
> Thanks
> 
> Alec
> 
> 
> On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <si...@gmail.com> wrote:
> hi ,
> 
> Just curious, did u face any isue with using kafka Spout if u did not use trident?
> Are u also able to implement the KafkaSpout packaged with Storm ? 
> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
> Can you lemme know about any issue u faced??
> 
> i get no error while i submitting my program but only a never ending sequence of the follwing:
> 68582 [Thread-33-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 68588 [Thread-33-words] INFO  backtype.storm.daemon.task - Emitting: words __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024, population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0, totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>]]
> 68649 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Refreshing partition manager connections
> 68649 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Refreshing partition manager connections
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Deleted partition managers: []
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - New partition managers: []
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Finished refreshing
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Deleted partition managers: []
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - New partition managers: []
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Finished refreshing
> 
> Thanks,
> Siddharth
> 
> 
> 
> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
> Hi, All
> 
> I am reading the messages from producer and print the "time" and "userhostaddress", but I am getting such warning once in a while:
> 
> 184.146.220.124
> 1403070062
> 24.79.224.172
> 1403070063
> 71.199.4.138
> 2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No data found in Kafka Partition partition_2
> 1403070064
> 172.4.221.83
> 2647191 [Thread-16-spout0] INFO  storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing because 60000ms have expired
> 2647195 [Thread-16-spout0] INFO  storm.artemis.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=10.100.70.128:9092}}
> 2648569 [Thread-8-$spoutcoord-spout0] INFO  storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing because 60000ms have expired
> 2648573 [Thread-8-$spoutcoord-spout0] INFO  storm.artemis.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=10.100.70.128:9092}}
> 1403070068
> 24.85.157.225
> 1403070070
> 24.114.78.75
> 1403070070
> 76.219.198.176
> 1403070071
> 142.166.228.205
> 1403070071
> 76.66.155.166
> 1403070071
> 172.56.10.86
> 1403070071
> 
> .
> .
> .
> It says brokers need refreshing because 60000ms have expired, I didn't see any 60000ms being configured anywhere, I wonder what this issue is. In addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
> 
> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2 isr: 1,2,3
> topic: topictest        partition: 1    leader: 1       replicas: 2,1,3 isr: 1,2,3
> topic: topictest        partition: 2    leader: 1       replicas: 3,2,1 isr: 1,2,3
> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3 isr: 1,2,3
> topic: topictest        partition: 4    leader: 1       replicas: 2,3,1 isr: 1,2,3
> 
> What I don't understand is that I couldn't see BrokerReader for the brokers 9093 and 9094. And also what that means "No data found in Kafka Partition partition_2".
> 
> 
> thanks
> 
> Alec
> 
> 
> 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Re: kafka.trident.ZkBrokerReader-brokers need refreshing

Posted by Sa Li <sa...@gmail.com>.
Hi, Siddharth

I only used trident topology for kafka spout, since I thought I can easily
to add .each function to parse the stream. I do add two types of
storm-kafka packages in my pom, See this pom

     <!-- Storm-Kafka compiled -->
    <dependency>
            <artifactId>storm-kafka</artifactId>
            <groupId>org.apache.storm</groupId>
            <version>0.9.2-incubating</version>
  <!--
            <scope>*compile*</scope>
  -->
                   <!-- exclude the zookeeper package from storm-Kafka -->
                <exclusions>
                   <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                   </exclusion>
                </exclusions>
    </dependency>

     <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.0-wip16a-scala292</version>
                        <!-- exclude the zookeeper package from storm-Kafka
-->
                <exclusions>
                   <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                   </exclusion>
                </exclusions>
     </dependency>

When I firstly running the kafkaSpout, I got stuck for a while, after
talking to developers here back and forth, I realize the version conflict
is really an issue we need to pay attention, you must be make the
zookeeper, storm, kafka version consistent, otherwise you will have
problem, or you need to exclude it in pom.

Thanks

Alec


On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <si...@gmail.com>
wrote:

> hi ,
>
> Just curious, did u face any isue with using kafka Spout if u did not use
> trident?
> Are u also able to implement the KafkaSpout packaged with Storm ?
> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
> Can you lemme know about any issue u faced??
>
> i get no error while i submitting my program but only a never ending
> sequence of the follwing:
> 68582 [Thread-33-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 68588 [Thread-33-words] INFO  backtype.storm.daemon.task - Emitting: words
> __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint
> [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
> capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]>]]
> 68649 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68649 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
>
> Thanks,
> Siddharth
>
>
>
> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:
>
>> Hi, All
>>
>> I am reading the messages from producer and print the "time" and
>> "userhostaddress", but I am getting such warning once in a while:
>>
>> 184.146.220.124
>> 1403070062
>> 24.79.224.172
>> 1403070063
>> 71.199.4.138
>> 2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No data
>> found in Kafka Partition partition_2
>> 1403070064
>> 172.4.221.83
>> 2647191 [Thread-16-spout0] INFO
>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 2647195 [Thread-16-spout0] INFO  storm.artemis.kafka.DynamicBrokersReader
>> - Read partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
>> 10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>> 10.100.70.128:9092}}
>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>> 10.100.70.128:9092}}
>> 1403070068
>> 24.85.157.225
>> 1403070070
>> 24.114.78.75
>> 1403070070
>> 76.219.198.176
>> 1403070071
>> 142.166.228.205
>> 1403070071
>> 76.66.155.166
>> 1403070071
>> 172.56.10.86
>> 1403070071
>>
>> .
>> .
>> .
>> It says brokers need refreshing because 60000ms have expired, I didn't
>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>
>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
>> isr: 1,2,3
>> topic: topictest        partition: 1    leader: 1       replicas: 2,1,3
>> isr: 1,2,3
>> topic: topictest        partition: 2    leader: 1       replicas: 3,2,1
>> isr: 1,2,3
>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
>> isr: 1,2,3
>> topic: topictest        partition: 4    leader: 1       replicas: 2,3,1
>> isr: 1,2,3
>>
>> What I don't understand is that I couldn't see BrokerReader for the
>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>> Partition partition_2".
>>
>>
>> thanks
>>
>> Alec
>>
>>
>

Re: kafka.trident.ZkBrokerReader-brokers need refreshing

Posted by siddharth ubale <si...@gmail.com>.
hi ,

Just curious, did u face any isue with using kafka Spout if u did not use
trident?
Are u also able to implement the KafkaSpout packaged with Storm ?
I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read from
kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
Can you lemme know about any issue u faced??

i get no error while i submitting my program but only a never ending
sequence of the follwing:
68582 [Thread-33-words] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
68588 [Thread-33-words] INFO  backtype.storm.daemon.task - Emitting: words
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
#<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
[__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
[__emit-count = {}]>]]
68649 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68649 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Finished
refreshing
68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Finished
refreshing

Thanks,
Siddharth



On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa...@gmail.com> wrote:

> Hi, All
>
> I am reading the messages from producer and print the "time" and
> "userhostaddress", but I am getting such warning once in a while:
>
> 184.146.220.124
> 1403070062
> 24.79.224.172
> 1403070063
> 71.199.4.138
> 2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No data
> found in Kafka Partition partition_2
> 1403070064
> 172.4.221.83
> 2647191 [Thread-16-spout0] INFO
> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
> because 60000ms have expired
> 2647195 [Thread-16-spout0] INFO  storm.artemis.kafka.DynamicBrokersReader
> - Read partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
> 10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
> 10.100.70.128:9092}}
> 2648569 [Thread-8-$spoutcoord-spout0] INFO
> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
> because 60000ms have expired
> 2648573 [Thread-8-$spoutcoord-spout0] INFO
> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
> 10.100.70.128:9092}}
> 1403070068
> 24.85.157.225
> 1403070070
> 24.114.78.75
> 1403070070
> 76.219.198.176
> 1403070071
> 142.166.228.205
> 1403070071
> 76.66.155.166
> 1403070071
> 172.56.10.86
> 1403070071
>
> .
> .
> .
> It says brokers need refreshing because 60000ms have expired, I didn't see
> any 60000ms being configured anywhere, I wonder what this issue is. In
> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>
> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
> isr: 1,2,3
> topic: topictest        partition: 1    leader: 1       replicas: 2,1,3
> isr: 1,2,3
> topic: topictest        partition: 2    leader: 1       replicas: 3,2,1
> isr: 1,2,3
> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
> isr: 1,2,3
> topic: topictest        partition: 4    leader: 1       replicas: 2,3,1
> isr: 1,2,3
>
> What I don't understand is that I couldn't see BrokerReader for the
> brokers 9093 and 9094. And also what that means "No data found in Kafka
> Partition partition_2".
>
>
> thanks
>
> Alec
>
>