You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Nishu <ni...@gmail.com> on 2014/05/01 18:40:42 UTC

Re: Question on Acking

Hi,

Currently I am also working on KafkaSpout, but my bolt is not emitting any
message. Kafka Topic has various messages.When I consume messages from
kafka consumer on terminal, it shows all the messages.
But while executing Topology, getting following logs :

68503 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68503 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Finished
refreshing
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Finished
refreshing

Can you please share which storm-kafka dependencies and versions are using
in your pom.xml?

Any help would be really appreciated.

Thanks,
Nishu


On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <li...@gmail.com>wrote:

> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
> execute() method is shown below:
>
>     public void execute(Tuple input) {
>         _collector.setContext(input);
>         try {
>             _bolt.execute(input, _collector);
>             _collector.getOutputter().ack(input);
>         } catch(FailedException e) {
>             if(e instanceof ReportedFailedException) {
>                 _collector.reportError(e);
>             }
>             _collector.getOutputter().fail(input);
>         }
>     }
>
> As can be seen, the BaseBasicBolt is only useful for the pattern of
> reading an input tuple, emitting tuples based on it, and then acking the
> tuple at the end of the execute() method. In other words, the ack() is
> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even
> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing), it
> will still ack the input after its execute() finishes each time.
>
> Qian
>
>
> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar <ka...@gmail.com>wrote:
>
>> Hi,
>> I have a strange problem my topology. I use KafkaSpout to read from a
>> kafka topic and i find that the topology stops consuming messages after a
>> while, without apparent reason.
>> I suspect this on acking.
>>
>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
>> bolt is use a condition to emit tuples out.
>> My question is -
>> 1. When I emit from an execute(...), does that mean acking happens
>> automatically here?
>> 2. What if I dont emit all tuples? If I use a condition like the code
>> highlighted below, does it mean that acking does not happen when execute
>> method is not complete?
>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
>> method *ack *used for acking while BasicOutputCollector has no such
>> method. What do I do to explicitly ack using BasicOutputCollector?
>> 4. If I have a bolt that saves values to DB and does not emit anything,
>> will it cause an problem?
>>
>> E.g.,
>>
>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>             String sentence = tuple.getString(0);
>>             for(String word: sentence.split(" ")) {
>>                 *if (word.equals("the"))* collector.emit(new Values(word));
>>             }
>>         }
>>
>> Please help!
>>
>> Regards,
>> Kashyap
>>
>
>


-- 
with regards,
Nishu Tayal

Re: Question on Acking

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Nishu,
I use
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0-rc3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.1</version>
</dependency>

for kafkaspout

Regards,
kashyap


On Thu, May 1, 2014 at 11:40 AM, Nishu <ni...@gmail.com> wrote:

> Hi,
>
> Currently I am also working on KafkaSpout, but my bolt is not emitting any
> message. Kafka Topic has various messages.When I consume messages from
> kafka consumer on terminal, it shows all the messages.
> But while executing Topology, getting following logs :
>
> 68503 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68503 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
> 68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
>
> Can you please share which storm-kafka dependencies and versions are using
> in your pom.xml?
>
> Any help would be really appreciated.
>
> Thanks,
> Nishu
>
>
> On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <li...@gmail.com>wrote:
>
>> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
>> execute() method is shown below:
>>
>>     public void execute(Tuple input) {
>>         _collector.setContext(input);
>>         try {
>>             _bolt.execute(input, _collector);
>>             _collector.getOutputter().ack(input);
>>         } catch(FailedException e) {
>>             if(e instanceof ReportedFailedException) {
>>                 _collector.reportError(e);
>>             }
>>             _collector.getOutputter().fail(input);
>>         }
>>     }
>>
>> As can be seen, the BaseBasicBolt is only useful for the pattern of
>> reading an input tuple, emitting tuples based on it, and then acking the
>> tuple at the end of the execute() method. In other words, the ack() is
>> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even
>> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing),
>> it will still ack the input after its execute() finishes each time.
>>
>> Qian
>>
>>
>> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar <ka...@gmail.com>wrote:
>>
>>> Hi,
>>> I have a strange problem my topology. I use KafkaSpout to read from a
>>> kafka topic and i find that the topology stops consuming messages after a
>>> while, without apparent reason.
>>> I suspect this on acking.
>>>
>>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
>>> bolt is use a condition to emit tuples out.
>>> My question is -
>>> 1. When I emit from an execute(...), does that mean acking happens
>>> automatically here?
>>> 2. What if I dont emit all tuples? If I use a condition like the code
>>> highlighted below, does it mean that acking does not happen when execute
>>> method is not complete?
>>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
>>> method *ack *used for acking while BasicOutputCollector has no such
>>> method. What do I do to explicitly ack using BasicOutputCollector?
>>> 4. If I have a bolt that saves values to DB and does not emit anything,
>>> will it cause an problem?
>>>
>>> E.g.,
>>>
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>             String sentence = tuple.getString(0);
>>>             for(String word: sentence.split(" ")) {
>>>                 *if (word.equals("the"))* collector.emit(new Values(word));
>>>             }
>>>         }
>>>
>>> Please help!
>>>
>>> Regards,
>>> Kashyap
>>>
>>
>>
>
>
> --
> with regards,
> Nishu Tayal
>