You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Nishu <ni...@gmail.com> on 2014/05/01 18:40:42 UTC
Re: Question on Acking
Hi,
Currently I am also working on KafkaSpout, but my bolt is not emitting any
message. Kafka Topic has various messages.When I consume messages from
kafka consumer on terminal, it shows all the messages.
But while executing Topology, getting following logs :
68503 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68503 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Deleted partition
managers: []
68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - New partition
managers: []
68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Finished
refreshing
68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Deleted partition
managers: []
68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - New partition
managers: []
68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Finished
refreshing
Can you please share which storm-kafka dependencies and versions are using
in your pom.xml?
Any help would be really appreciated.
Thanks,
Nishu
On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <li...@gmail.com>wrote:
> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
> execute() method is shown below:
>
> public void execute(Tuple input) {
> _collector.setContext(input);
> try {
> _bolt.execute(input, _collector);
> _collector.getOutputter().ack(input);
> } catch(FailedException e) {
> if(e instanceof ReportedFailedException) {
> _collector.reportError(e);
> }
> _collector.getOutputter().fail(input);
> }
> }
>
> As can be seen, the BaseBasicBolt is only useful for the pattern of
> reading an input tuple, emitting tuples based on it, and then acking the
> tuple at the end of the execute() method. In other words, the ack() is
> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even
> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing), it
> will still ack the input after its execute() finishes each time.
>
> Qian
>
>
> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar <ka...@gmail.com>wrote:
>
>> Hi,
>> I have a strange problem my topology. I use KafkaSpout to read from a
>> kafka topic and i find that the topology stops consuming messages after a
>> while, without apparent reason.
>> I suspect this on acking.
>>
>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
>> bolt is use a condition to emit tuples out.
>> My question is -
>> 1. When I emit from an execute(...), does that mean acking happens
>> automatically here?
>> 2. What if I dont emit all tuples? If I use a condition like the code
>> highlighted below, does it mean that acking does not happen when execute
>> method is not complete?
>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
>> method *ack *used for acking while BasicOutputCollector has no such
>> method. What do I do to explicitly ack using BasicOutputCollector?
>> 4. If I have a bolt that saves values to DB and does not emit anything,
>> will it cause an problem?
>>
>> E.g.,
>>
>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>> String sentence = tuple.getString(0);
>> for(String word: sentence.split(" ")) {
>> *if (word.equals("the"))* collector.emit(new Values(word));
>> }
>> }
>>
>> Please help!
>>
>> Regards,
>> Kashyap
>>
>
>
--
with regards,
Nishu Tayal
Re: Question on Acking
Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Nishu,
I use
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0-rc3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.1</version>
</dependency>
for kafkaspout
Regards,
kashyap
On Thu, May 1, 2014 at 11:40 AM, Nishu <ni...@gmail.com> wrote:
> Hi,
>
> Currently I am also working on KafkaSpout, but my bolt is not emitting any
> message. Kafka Topic has various messages.When I consume messages from
> kafka consumer on terminal, it shows all the messages.
> But while executing Topology, getting following logs :
>
> 68503 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68503 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - New partition
> managers: []
> 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Finished
> refreshing
> 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - New partition
> managers: []
> 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Finished
> refreshing
>
> Can you please share which storm-kafka dependencies and versions are using
> in your pom.xml?
>
> Any help would be really appreciated.
>
> Thanks,
> Nishu
>
>
> On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <li...@gmail.com>wrote:
>
>> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
>> execute() method is shown below:
>>
>> public void execute(Tuple input) {
>> _collector.setContext(input);
>> try {
>> _bolt.execute(input, _collector);
>> _collector.getOutputter().ack(input);
>> } catch(FailedException e) {
>> if(e instanceof ReportedFailedException) {
>> _collector.reportError(e);
>> }
>> _collector.getOutputter().fail(input);
>> }
>> }
>>
>> As can be seen, the BaseBasicBolt is only useful for the pattern of
>> reading an input tuple, emitting tuples based on it, and then acking the
>> tuple at the end of the execute() method. In other words, the ack() is
>> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even
>> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing),
>> it will still ack the input after its execute() finishes each time.
>>
>> Qian
>>
>>
>> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar <ka...@gmail.com>wrote:
>>
>>> Hi,
>>> I have a strange problem my topology. I use KafkaSpout to read from a
>>> kafka topic and i find that the topology stops consuming messages after a
>>> while, without apparent reason.
>>> I suspect this on acking.
>>>
>>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
>>> bolt is use a condition to emit tuples out.
>>> My question is -
>>> 1. When I emit from an execute(...), does that mean acking happens
>>> automatically here?
>>> 2. What if I dont emit all tuples? If I use a condition like the code
>>> highlighted below, does it mean that acking does not happen when execute
>>> method is not complete?
>>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
>>> method *ack *used for acking while BasicOutputCollector has no such
>>> method. What do I do to explicitly ack using BasicOutputCollector?
>>> 4. If I have a bolt that saves values to DB and does not emit anything,
>>> will it cause an problem?
>>>
>>> E.g.,
>>>
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>> String sentence = tuple.getString(0);
>>> for(String word: sentence.split(" ")) {
>>> *if (word.equals("the"))* collector.emit(new Values(word));
>>> }
>>> }
>>>
>>> Please help!
>>>
>>> Regards,
>>> Kashyap
>>>
>>
>>
>
>
> --
> with regards,
> Nishu Tayal
>