You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Stuart Perks <st...@hotmail.com> on 2015/08/17 23:56:57 UTC

Ack not being called

Hi I am attempting to run guaranteed message processing but ACK is not being called. Post on stack overflow if you prefer answer there. 
http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>


Thanks


0
down vote
 <>favorite
 <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>	
I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.

I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.

Question The ack or fail is not being called and I cannot work out why?

Here is a shortened code sample.

Spout Code using BaseRichSpout

public void nextTuple() {

    .... further code ....

    String msgID = UUID.randomUUID().toString()
                    + System.currentTimeMillis();

    Values value = new Values(splitUsage[0], splitUsage[1],
                    splitUsage[2], msgID);
    outputCollector.emit(value, msgID);

}

@Override
public void ack(Object msgId) {
    this.pendingTuples.remove(msgId);
    LOG.info("Ack " + msgId);
}

@Override
public void fail(Object msgId) {
    // Re-emit the tuple
    LOG.info("Fail " + msgId);
    this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
}
Bolt Code using BaseRichBolt

@Override
public void execute(Tuple inputTuple) {

this.outputCollector.emit(inputTuple, new Values(serverData, msgId));

this.outputCollector.ack(inputTuple);
}
Final Bolt

@Override
public void execute(Tuple inputTuple) {
  ..... Simply reports does not emit .....
  this.outputCollector.ack(inputTuple);
}



Re: Ack not being called

Posted by Stuart Perks <st...@hotmail.com>.
Worked it out. 

Removing a for loop in the spout wrapped around the emit fixed it. 

Any ideas why this makes  a different? 


> On 18 Aug 2015, at 06:12, Abhishek Agarwal <ab...@gmail.com> wrote:
> 
> Couple of questions - 
> 1. Are you adding the tuples to pendingTuple list before emitting them in the list? Since I didn't see that in the code.
> 2. Is logging correctly configured? Can you use sysout instead of log.info <http://log.info/> and then try out. 
> 
> On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <stup192@hotmail.com <ma...@hotmail.com>> wrote:
> Set to 23 the same number as the workers are set to. 
> 
> thanks
>> On 17 Aug 2015, at 23:04, Javier Gonzalez <jagonzal@gmail.com <ma...@gmail.com>> wrote:
>> 
>> How many ackers have you got configured when you submit your topology?
>> 
>> On Aug 17, 2015 5:57 PM, "Stuart Perks" <stup192@hotmail.com <ma...@hotmail.com>> wrote:
>> Hi I am attempting to run guaranteed message processing but ACK is not being called. Post on stack overflow if you prefer answer there. 
>> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>
>> 
>> 
>> Thanks
>> 
>> 
>> 0
>> down vote
>>  <>favorite
>>  <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>	
>> I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.
>> 
>> I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.
>> 
>> Question The ack or fail is not being called and I cannot work out why?
>> 
>> Here is a shortened code sample.
>> 
>> Spout Code using BaseRichSpout
>> 
>> public void nextTuple() {
>> 
>>     .... further code ....
>> 
>>     String msgID = UUID.randomUUID().toString()
>>                     + System.currentTimeMillis();
>> 
>>     Values value = new Values(splitUsage[0], splitUsage[1],
>>                     splitUsage[2], msgID);
>>     outputCollector.emit(value, msgID);
>> 
>> }
>> 
>> @Override
>> public void ack(Object msgId) {
>>     this.pendingTuples.remove(msgId);
>>     LOG.info("Ack " + msgId);
>> }
>> 
>> @Override
>> public void fail(Object msgId) {
>>     // Re-emit the tuple
>>     LOG.info("Fail " + msgId);
>>     this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
>> }
>> Bolt Code using BaseRichBolt
>> 
>> @Override
>> public void execute(Tuple inputTuple) {
>> 
>> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>> 
>> this.outputCollector.ack(inputTuple);
>> }
>> Final Bolt
>> 
>> @Override
>> public void execute(Tuple inputTuple) {
>>   ..... Simply reports does not emit .....
>>   this.outputCollector.ack(inputTuple);
>> }
>> 
>> 
> 
> 
> 
> 
> -- 
> Regards,
> Abhishek Agarwal
> 


Re: Ack not being called

Posted by Abhishek Agarwal <ab...@gmail.com>.
Couple of questions -
1. Are you adding the tuples to pendingTuple list before emitting them in
the list? Since I didn't see that in the code.
2. Is logging correctly configured? Can you use sysout instead of log.info
and then try out.

On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <st...@hotmail.com> wrote:

> Set to 23 the same number as the workers are set to.
>
> thanks
>
> On 17 Aug 2015, at 23:04, Javier Gonzalez <ja...@gmail.com> wrote:
>
> How many ackers have you got configured when you submit your topology?
> On Aug 17, 2015 5:57 PM, "Stuart Perks" <st...@hotmail.com> wrote:
>
>> Hi I am attempting to run guaranteed message processing but ACK is not
>> being called. Post on stack overflow if you prefer answer there.
>> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working
>>
>>
>> Thanks
>>
>>
>> 0down votefavorite
>> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
>>
>> I am trying to implement the guaranteed message processing but the ack or
>> fail methods on the Spout are not being called.
>>
>> I am passing the a message ID object with the spout. I am passing the
>> tuple with each bolt and calling collector.ack(tuple) in each bolt.
>>
>> *Question* The ack or fail is not being called and I cannot work out why?
>>
>> Here is a shortened code sample.
>>
>> *Spout Code using BaseRichSpout*
>>
>> public void nextTuple() {
>>
>>     .... further code ....
>>
>>     String msgID = UUID.randomUUID().toString()
>>                     + System.currentTimeMillis();
>>
>>     Values value = new Values(splitUsage[0], splitUsage[1],
>>                     splitUsage[2], msgID);
>>     outputCollector.emit(value, msgID);
>> }
>> @Overridepublic void ack(Object msgId) {
>>     this.pendingTuples.remove(msgId);
>>     LOG.info("Ack " + msgId);}
>> @Overridepublic void fail(Object msgId) {
>>     // Re-emit the tuple
>>     LOG.info("Fail " + msgId);
>>     this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);}
>>
>> *Bolt Code using BaseRichBolt*
>>
>> @Overridepublic void execute(Tuple inputTuple) {
>> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>> this.outputCollector.ack(inputTuple);}
>>
>> *Final Bolt*
>>
>> @Overridepublic void execute(Tuple inputTuple) {
>>   ..... Simply reports does not emit .....
>>   this.outputCollector.ack(inputTuple);
>>
>> }
>>
>>
>


-- 
Regards,
Abhishek Agarwal

Re: Ack not being called

Posted by Stuart Perks <st...@hotmail.com>.
Set to 23 the same number as the workers are set to. 

thanks
> On 17 Aug 2015, at 23:04, Javier Gonzalez <ja...@gmail.com> wrote:
> 
> How many ackers have you got configured when you submit your topology?
> 
> On Aug 17, 2015 5:57 PM, "Stuart Perks" <stup192@hotmail.com <ma...@hotmail.com>> wrote:
> Hi I am attempting to run guaranteed message processing but ACK is not being called. Post on stack overflow if you prefer answer there. 
> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>
> 
> 
> Thanks
> 
> 
> 0
> down vote
>  <>favorite
>  <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>	
> I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.
> 
> I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.
> 
> Question The ack or fail is not being called and I cannot work out why?
> 
> Here is a shortened code sample.
> 
> Spout Code using BaseRichSpout
> 
> public void nextTuple() {
> 
>     .... further code ....
> 
>     String msgID = UUID.randomUUID().toString()
>                     + System.currentTimeMillis();
> 
>     Values value = new Values(splitUsage[0], splitUsage[1],
>                     splitUsage[2], msgID);
>     outputCollector.emit(value, msgID);
> 
> }
> 
> @Override
> public void ack(Object msgId) {
>     this.pendingTuples.remove(msgId);
>     LOG.info("Ack " + msgId);
> }
> 
> @Override
> public void fail(Object msgId) {
>     // Re-emit the tuple
>     LOG.info("Fail " + msgId);
>     this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
> }
> Bolt Code using BaseRichBolt
> 
> @Override
> public void execute(Tuple inputTuple) {
> 
> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
> 
> this.outputCollector.ack(inputTuple);
> }
> Final Bolt
> 
> @Override
> public void execute(Tuple inputTuple) {
>   ..... Simply reports does not emit .....
>   this.outputCollector.ack(inputTuple);
> }
> 
> 


Re: Ack not being called

Posted by Javier Gonzalez <ja...@gmail.com>.
How many ackers have you got configured when you submit your topology?
On Aug 17, 2015 5:57 PM, "Stuart Perks" <st...@hotmail.com> wrote:

> Hi I am attempting to run guaranteed message processing but ACK is not
> being called. Post on stack overflow if you prefer answer there.
> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working
>
>
> Thanks
>
>
> 0down votefavorite
> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
>
> I am trying to implement the guaranteed message processing but the ack or
> fail methods on the Spout are not being called.
>
> I am passing the a message ID object with the spout. I am passing the
> tuple with each bolt and calling collector.ack(tuple) in each bolt.
>
> *Question* The ack or fail is not being called and I cannot work out why?
>
> Here is a shortened code sample.
>
> *Spout Code using BaseRichSpout*
>
> public void nextTuple() {
>
>     .... further code ....
>
>     String msgID = UUID.randomUUID().toString()
>                     + System.currentTimeMillis();
>
>     Values value = new Values(splitUsage[0], splitUsage[1],
>                     splitUsage[2], msgID);
>     outputCollector.emit(value, msgID);
> }
> @Overridepublic void ack(Object msgId) {
>     this.pendingTuples.remove(msgId);
>     LOG.info("Ack " + msgId);}
> @Overridepublic void fail(Object msgId) {
>     // Re-emit the tuple
>     LOG.info("Fail " + msgId);
>     this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);}
>
> *Bolt Code using BaseRichBolt*
>
> @Overridepublic void execute(Tuple inputTuple) {
> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
> this.outputCollector.ack(inputTuple);}
>
> *Final Bolt*
>
> @Overridepublic void execute(Tuple inputTuple) {
>   ..... Simply reports does not emit .....
>   this.outputCollector.ack(inputTuple);
>
> }
>
>