You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Stuart Perks <st...@hotmail.com> on 2015/08/17 23:56:57 UTC
Ack not being called
Hi I am attempting to run guaranteed message processing but ACK is not being called. Post on stack overflow if you prefer answer there.
http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>
Thanks
0
down vote
<>favorite
<http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.
I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.
Question The ack or fail is not being called and I cannot work out why?
Here is a shortened code sample.
Spout Code using BaseRichSpout
public void nextTuple() {
.... further code ....
String msgID = UUID.randomUUID().toString()
+ System.currentTimeMillis();
Values value = new Values(splitUsage[0], splitUsage[1],
splitUsage[2], msgID);
outputCollector.emit(value, msgID);
}
@Override
public void ack(Object msgId) {
this.pendingTuples.remove(msgId);
LOG.info("Ack " + msgId);
}
@Override
public void fail(Object msgId) {
// Re-emit the tuple
LOG.info("Fail " + msgId);
this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
}
Bolt Code using BaseRichBolt
@Override
public void execute(Tuple inputTuple) {
this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
this.outputCollector.ack(inputTuple);
}
Final Bolt
@Override
public void execute(Tuple inputTuple) {
..... Simply reports does not emit .....
this.outputCollector.ack(inputTuple);
}
Re: Ack not being called
Posted by Stuart Perks <st...@hotmail.com>.
Worked it out.
Removing a for loop in the spout wrapped around the emit fixed it.
Any ideas why this makes a different?
> On 18 Aug 2015, at 06:12, Abhishek Agarwal <ab...@gmail.com> wrote:
>
> Couple of questions -
> 1. Are you adding the tuples to pendingTuple list before emitting them in the list? Since I didn't see that in the code.
> 2. Is logging correctly configured? Can you use sysout instead of log.info <http://log.info/> and then try out.
>
> On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <stup192@hotmail.com <ma...@hotmail.com>> wrote:
> Set to 23 the same number as the workers are set to.
>
> thanks
>> On 17 Aug 2015, at 23:04, Javier Gonzalez <jagonzal@gmail.com <ma...@gmail.com>> wrote:
>>
>> How many ackers have you got configured when you submit your topology?
>>
>> On Aug 17, 2015 5:57 PM, "Stuart Perks" <stup192@hotmail.com <ma...@hotmail.com>> wrote:
>> Hi I am attempting to run guaranteed message processing but ACK is not being called. Post on stack overflow if you prefer answer there.
>> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>
>>
>>
>> Thanks
>>
>>
>> 0
>> down vote
>> <>favorite
>> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
>> I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.
>>
>> I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.
>>
>> Question The ack or fail is not being called and I cannot work out why?
>>
>> Here is a shortened code sample.
>>
>> Spout Code using BaseRichSpout
>>
>> public void nextTuple() {
>>
>> .... further code ....
>>
>> String msgID = UUID.randomUUID().toString()
>> + System.currentTimeMillis();
>>
>> Values value = new Values(splitUsage[0], splitUsage[1],
>> splitUsage[2], msgID);
>> outputCollector.emit(value, msgID);
>>
>> }
>>
>> @Override
>> public void ack(Object msgId) {
>> this.pendingTuples.remove(msgId);
>> LOG.info("Ack " + msgId);
>> }
>>
>> @Override
>> public void fail(Object msgId) {
>> // Re-emit the tuple
>> LOG.info("Fail " + msgId);
>> this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
>> }
>> Bolt Code using BaseRichBolt
>>
>> @Override
>> public void execute(Tuple inputTuple) {
>>
>> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>>
>> this.outputCollector.ack(inputTuple);
>> }
>> Final Bolt
>>
>> @Override
>> public void execute(Tuple inputTuple) {
>> ..... Simply reports does not emit .....
>> this.outputCollector.ack(inputTuple);
>> }
>>
>>
>
>
>
>
> --
> Regards,
> Abhishek Agarwal
>
Re: Ack not being called
Posted by Abhishek Agarwal <ab...@gmail.com>.
Couple of questions -
1. Are you adding the tuples to pendingTuple list before emitting them in
the list? Since I didn't see that in the code.
2. Is logging correctly configured? Can you use sysout instead of log.info
and then try out.
On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <st...@hotmail.com> wrote:
> Set to 23 the same number as the workers are set to.
>
> thanks
>
> On 17 Aug 2015, at 23:04, Javier Gonzalez <ja...@gmail.com> wrote:
>
> How many ackers have you got configured when you submit your topology?
> On Aug 17, 2015 5:57 PM, "Stuart Perks" <st...@hotmail.com> wrote:
>
>> Hi I am attempting to run guaranteed message processing but ACK is not
>> being called. Post on stack overflow if you prefer answer there.
>> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working
>>
>>
>> Thanks
>>
>>
>> 0down votefavorite
>> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
>>
>> I am trying to implement the guaranteed message processing but the ack or
>> fail methods on the Spout are not being called.
>>
>> I am passing the a message ID object with the spout. I am passing the
>> tuple with each bolt and calling collector.ack(tuple) in each bolt.
>>
>> *Question* The ack or fail is not being called and I cannot work out why?
>>
>> Here is a shortened code sample.
>>
>> *Spout Code using BaseRichSpout*
>>
>> public void nextTuple() {
>>
>> .... further code ....
>>
>> String msgID = UUID.randomUUID().toString()
>> + System.currentTimeMillis();
>>
>> Values value = new Values(splitUsage[0], splitUsage[1],
>> splitUsage[2], msgID);
>> outputCollector.emit(value, msgID);
>> }
>> @Overridepublic void ack(Object msgId) {
>> this.pendingTuples.remove(msgId);
>> LOG.info("Ack " + msgId);}
>> @Overridepublic void fail(Object msgId) {
>> // Re-emit the tuple
>> LOG.info("Fail " + msgId);
>> this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);}
>>
>> *Bolt Code using BaseRichBolt*
>>
>> @Overridepublic void execute(Tuple inputTuple) {
>> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>> this.outputCollector.ack(inputTuple);}
>>
>> *Final Bolt*
>>
>> @Overridepublic void execute(Tuple inputTuple) {
>> ..... Simply reports does not emit .....
>> this.outputCollector.ack(inputTuple);
>>
>> }
>>
>>
>
--
Regards,
Abhishek Agarwal
Re: Ack not being called
Posted by Stuart Perks <st...@hotmail.com>.
Set to 23 the same number as the workers are set to.
thanks
> On 17 Aug 2015, at 23:04, Javier Gonzalez <ja...@gmail.com> wrote:
>
> How many ackers have you got configured when you submit your topology?
>
> On Aug 17, 2015 5:57 PM, "Stuart Perks" <stup192@hotmail.com <ma...@hotmail.com>> wrote:
> Hi I am attempting to run guaranteed message processing but ACK is not being called. Post on stack overflow if you prefer answer there.
> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>
>
>
> Thanks
>
>
> 0
> down vote
> <>favorite
> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
> I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.
>
> I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.
>
> Question The ack or fail is not being called and I cannot work out why?
>
> Here is a shortened code sample.
>
> Spout Code using BaseRichSpout
>
> public void nextTuple() {
>
> .... further code ....
>
> String msgID = UUID.randomUUID().toString()
> + System.currentTimeMillis();
>
> Values value = new Values(splitUsage[0], splitUsage[1],
> splitUsage[2], msgID);
> outputCollector.emit(value, msgID);
>
> }
>
> @Override
> public void ack(Object msgId) {
> this.pendingTuples.remove(msgId);
> LOG.info("Ack " + msgId);
> }
>
> @Override
> public void fail(Object msgId) {
> // Re-emit the tuple
> LOG.info("Fail " + msgId);
> this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
> }
> Bolt Code using BaseRichBolt
>
> @Override
> public void execute(Tuple inputTuple) {
>
> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>
> this.outputCollector.ack(inputTuple);
> }
> Final Bolt
>
> @Override
> public void execute(Tuple inputTuple) {
> ..... Simply reports does not emit .....
> this.outputCollector.ack(inputTuple);
> }
>
>
Re: Ack not being called
Posted by Javier Gonzalez <ja...@gmail.com>.
How many ackers have you got configured when you submit your topology?
On Aug 17, 2015 5:57 PM, "Stuart Perks" <st...@hotmail.com> wrote:
> Hi I am attempting to run guaranteed message processing but ACK is not
> being called. Post on stack overflow if you prefer answer there.
> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working
>
>
> Thanks
>
>
> 0down votefavorite
> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
>
> I am trying to implement the guaranteed message processing but the ack or
> fail methods on the Spout are not being called.
>
> I am passing the a message ID object with the spout. I am passing the
> tuple with each bolt and calling collector.ack(tuple) in each bolt.
>
> *Question* The ack or fail is not being called and I cannot work out why?
>
> Here is a shortened code sample.
>
> *Spout Code using BaseRichSpout*
>
> public void nextTuple() {
>
> .... further code ....
>
> String msgID = UUID.randomUUID().toString()
> + System.currentTimeMillis();
>
> Values value = new Values(splitUsage[0], splitUsage[1],
> splitUsage[2], msgID);
> outputCollector.emit(value, msgID);
> }
> @Overridepublic void ack(Object msgId) {
> this.pendingTuples.remove(msgId);
> LOG.info("Ack " + msgId);}
> @Overridepublic void fail(Object msgId) {
> // Re-emit the tuple
> LOG.info("Fail " + msgId);
> this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);}
>
> *Bolt Code using BaseRichBolt*
>
> @Overridepublic void execute(Tuple inputTuple) {
> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
> this.outputCollector.ack(inputTuple);}
>
> *Final Bolt*
>
> @Overridepublic void execute(Tuple inputTuple) {
> ..... Simply reports does not emit .....
> this.outputCollector.ack(inputTuple);
>
> }
>
>