You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Ash G <as...@gmail.com> on 2014/10/15 00:20:31 UTC

Ack not working with Kafka Storm spout in Storm 0.9.2

Storm keeps re-reading the same data from Kafka topic again and again.

I have kafka spout and series of bolts and I am anchoring the tuple. In
last bolt. I do _collector.ack(tuple).

I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All
Bolts are extending BaseRichBolt.

I checked the messageIds and I see as tuple moves through the bolts,
following messageIds are displayed. Ie.

{-210221166871835114=-8820821848891311855}

{-210221166871835114=5058142667727028101}

{-210221166871835114=-3068891797148595604}

Also i have

zkRoot = "/brokers"; //Zkroot will be used as root to store your consumer's
offset

Looks like offsets are not being stored properly by kafka spout on Ack.
This problem does not happen when numAckers = 0 ie. Ack is disabled.

Any suggestion or clues on where can I look?

Re: Ack not working with Kafka Storm spout in Storm 0.9.2

Posted by Ash G <as...@gmail.com>.
Thank You,

"Also you need to  ack the tuple from intermediate bolts.  "

This worked. I was not aware we needed to ack from  from intermediate
bolts.

Earlier, I was anchoring in all bolts and doing ack in only the final bolt.

Ash


On Tue, Oct 14, 2014 at 6:23 PM, Harsha <st...@harsha.io> wrote:

>  Ash,
>     you need to anchor the tuple in the intermediate bolts as well. Please
> take  a look at the docs here
>
> https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html
>
> "java _collector.emit(new Values(word));
>
> Emitting the word tuple this way causes it to be *unanchored*. If the
> tuple fails be processed downstream, the root tuple will not be replayed.
> Depending on the fault-tolerance guarantees you need in your topology,
> sometimes it’s appropriate to emit an unanchored tuple."
> Also you need to  ack the tuple from intermediate bolts.
> -Harsha
>
>
> On Tue, Oct 14, 2014, at 03:20 PM, Ash G wrote:
>
> Storm keeps re-reading the same data from Kafka topic again and again.
>
> I have kafka spout and series of bolts and I am anchoring the tuple. In
> last bolt. I do _collector.ack(tuple).
>
> I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All
> Bolts are extending BaseRichBolt.
>
> I checked the messageIds and I see as tuple moves through the bolts,
> following messageIds are displayed. Ie.
>
> {-210221166871835114=-8820821848891311855}
>
>
> {-210221166871835114=5058142667727028101}
>
>
> {-210221166871835114=-3068891797148595604}
>
> Also i have
>
> zkRoot = "/brokers"; //Zkroot will be used as root to store your
> consumer's offset
>
> Looks like offsets are not being stored properly by kafka spout on Ack.
> This problem does not happen when numAckers = 0 ie. Ack is disabled.
>
> Any suggestion or clues on where can I look?
>
>
>

Re: Ack not working with Kafka Storm spout in Storm 0.9.2

Posted by Harsha <st...@harsha.io>.
Ash,

    you need to anchor the tuple in the intermediate bolts as
well. Please take  a look at the docs here

[1]https://storm.incubator.apache.org/documentation/Guaranteein
g-message-processing.html



"java _collector.emit(new Values(word));

Emitting the word tuple this way causes it to be unanchored. If
the tuple fails be processed downstream, the root tuple will
not be replayed. Depending on the fault-tolerance guarantees
you need in your topology, sometimes it’s appropriate to emit
an unanchored tuple."

Also you need to  ack the tuple from intermediate bolts.

-Harsha





On Tue, Oct 14, 2014, at 03:20 PM, Ash G wrote:

Storm keeps re-reading the same data from Kafka topic again and
again.

I have kafka spout and series of bolts and I am anchoring the
tuple. In last bolt. I do _collector.ack(tuple).

I am using Kafka Storm Spout bundled with Storm 0.9.2
distribution. All Bolts are extending BaseRichBolt.

I checked the messageIds and I see as tuple moves through the
bolts, following messageIds are displayed. Ie.

{-210221166871835114=-8820821848891311855}

{-210221166871835114=5058142667727028101}

{-210221166871835114=-3068891797148595604}

Also i have

zkRoot = "/brokers"; //Zkroot will be used as root to store
your consumer's offset

Looks like offsets are not being stored properly by kafka spout
on Ack. This problem does not happen when numAckers = 0 ie. Ack
is disabled.

Any suggestion or clues on where can I look?

References

1. https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html

Re: Ack not working with Kafka Storm spout in Storm 0.9.2

Posted by Binita Bharati <bi...@gmail.com>.
Ash,

I have tried ack with storm-0.9.2 incubating. I acknowledged the tuple in
all intermediate Bolts too.

E.g topology:

Spout1 -> Bolt1 -> Bolt2

1)Spout1 emitted a anchored tuple.
execute method :

*collector.emit(new Values(emitWord), msgId);*

2)Bolt2 and Bolt3 emitted a normal tuple, but acknowledged it.
execute method :


*collector.emit(new Values(emitWord));//Unanchored
tuple;this.collector.ack(tuple);*

-Binita


On Wed, Oct 15, 2014 at 3:50 AM, Ash G <as...@gmail.com> wrote:

> Storm keeps re-reading the same data from Kafka topic again and again.
>
> I have kafka spout and series of bolts and I am anchoring the tuple. In
> last bolt. I do _collector.ack(tuple).
>
> I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All
> Bolts are extending BaseRichBolt.
>
> I checked the messageIds and I see as tuple moves through the bolts,
> following messageIds are displayed. Ie.
>
> {-210221166871835114=-8820821848891311855}
>
> {-210221166871835114=5058142667727028101}
>
> {-210221166871835114=-3068891797148595604}
>
> Also i have
>
> zkRoot = "/brokers"; //Zkroot will be used as root to store your
> consumer's offset
>
> Looks like offsets are not being stored properly by kafka spout on Ack.
> This problem does not happen when numAckers = 0 ie. Ack is disabled.
>
> Any suggestion or clues on where can I look?
>