You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by pradeep s <sr...@gmail.com> on 2017/09/01 05:04:12 UTC

Storm kafka spout offset going back

Hi,
I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka server
is 0.10.1.1.

Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.

Message flow is like below and its a normal topology

KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.

If the message fails avro deserialization , i am moving the message to a
error queue and acknowledging from the avro bolt . This message is not
emitted to database bolt .

But its observed that after i restart topology , offset for the topic is
going back to old offset.

Will Kafka commit the offset, only if the message is acked from all bolts ?

Is the offset going back to previous value is beacuse of this ..

Thanks
Pradeep

Re: Storm kafka spout offset going back

Posted by Stig Rohde Døssing <sr...@apache.org>.
Okay. Please consider uncapping max uncommitted offsets (i.e. set it to
Integer.MAX_VALUE), so we can at least rule this out as a factor in the
hanging. You are also welcome to try out
https://github.com/apache/storm/pull/2156 if you need the functionality and
you're willing to test out the patch.

About the retry service, what I'm actually asking about is what is your
retry backoff. I'm essentially asking if it is possible that one or more
failed tuples are still waiting for retry when you reboot? The spout won't
commit past any tuples that are waiting for retry, so maybe it could be
causing issue #2?


2017-09-02 18:58 GMT+02:00 pradeep s <sr...@gmail.com>:

> Hi Stig,
> Max uncommitted offsets is set at 10_000 and retry threshold for exception
> handling is 10.
> Thanks
> Pradeep
>
>
>
> On Sat, Sep 2, 2017 at 4:46 AM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> Thanks. The bolts look fine to me. I'd look at whether the tuples are
>> being acked on the spout (use the debug setting on Config), and the
>> OffsetManager class logs I linked earlier. I don't know if it's relevant to
>> your case, but please note that there are some cases where setting a low
>> maxUncommittedOffsets can cause the spout to stop polling for tuples. It's
>> being fixed, but please leave maxUncommittedOffsets at the default if
>> you're setting it to a custom value.
>>
>> What is your retry service configuration?
>>
>> 2017-09-02 <20%2017%2009%2002> 0:11 GMT+02:00 pradeep s <
>> sreekumar.pradeep@gmail.com>:
>>
>>> Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt
>>> is not needed.
>>>
>>> Problem 2 was for a separate topic . Problem 1 was observed for topics
>>> where processing failures are encountered previously.
>>>
>>> I have attached the error processing and bolt files
>>>
>>> Thanks
>>> Pradeep
>>>
>>>
>>>
>>>
>>> On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <sr...@apache.org>
>>> wrote:
>>>
>>>> Just to make sure I understand:
>>>>
>>>> This is your topology
>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>>>>
>>>> The bolt you posted the execute method for is the DataBaseInsertBolt,
>>>> right?
>>>> What are these statements for if this is the last bolt in the topology?
>>>> "super.getOutputCollector().emit(tuple, new Values(fullMessage));"
>>>> Are the topics you mention in problem 1 and 2 the same topic?
>>>> Essentially what I'm asking is whether the topic that is stuck is also the
>>>> one with failures that is starting over on an old offset?
>>>> Can you post your RetryService configuration?
>>>> You talked about moving tuples to an error queue if they fail
>>>> deserialization in the Avro bolt. Can you post that execute too?
>>>>
>>>> 2017-09-01 20:14 GMT+02:00 pradeep s <sr...@gmail.com>:
>>>>
>>>>> Thanks  Stig for the response . I can give some more detail on the
>>>>> issue we are facing now .
>>>>> For any database failure ,we are retrying the tuple for upto 10 times
>>>>> . Database failure is mostly because of parent child relation ,since we are
>>>>> processing out of order .
>>>>> Our consumer group has more than 10 topics and  each topic corresponds
>>>>> to one table . For eg: we have topics A, B and C in a group its
>>>>> corresponding to tables A,B and C in database .
>>>>> In this , table A will the parent and table B and table C will be
>>>>> child tables .
>>>>> Spout parallelism is set as 50 and each topic has 50 partitions .These
>>>>> 50 threads are going round robing across all the topics in the group.
>>>>>
>>>>> Issues observed with the current setup are
>>>>>
>>>>> 1)One partition for one topic alone getting stuck .All the other
>>>>> partition lag is cleared
>>>>>
>>>>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>>>>
>>>>>
>>>>> DB Bolt Execute Method below
>>>>> =======================
>>>>> exceptionCount will have a value greater than 0 once the message is
>>>>> moved to error queue . In that case i am acknowleding the message . Other
>>>>> cases i am calling tuple.fail.
>>>>> There is no downstream bolt for this . This is the final bolt in the
>>>>> topology.
>>>>>
>>>>>  @Override
>>>>>
>>>>>     public void execute(final Tuple tuple) {
>>>>>
>>>>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>>>>> D_MESSAGE);
>>>>>
>>>>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>>>>> GG_MESSAGE);
>>>>>
>>>>>         try {
>>>>>
>>>>>             // Call to handler for generating Sql
>>>>>
>>>>>             Date date = new Date();
>>>>>
>>>>>             super.getMessageHandler().handleMessage(ggMessage, super
>>>>> .getGenericMessageDAO());
>>>>>
>>>>>             super.getOutputCollector().emit(tuple, new Values(
>>>>> fullMessage));
>>>>>
>>>>>             super.getOutputCollector().ack(tuple);
>>>>>
>>>>>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
>>>>> Date().getTime() - date.getTime());
>>>>>
>>>>>         } catch (Exception e) {
>>>>>
>>>>>             LOGGER.error("DB bolt exception occurred from Aurora : ",
>>>>> e);
>>>>>
>>>>>             int exceptionCount = handleException(fullMessage,
>>>>> ggMessage, e, isNormalProcessing);
>>>>>
>>>>>             if (exceptionCount != -1) {
>>>>>
>>>>>                 // If message write is success acknowledge the
>>>>> message so
>>>>>
>>>>>                 // that it will be removed from kafka queue
>>>>>
>>>>>                 super.getOutputCollector().emit(tuple, new Values(
>>>>> fullMessage));
>>>>>
>>>>>                 super.getOutputCollector().ack(tuple);
>>>>>
>>>>>             } else {
>>>>>
>>>>>                 super.getOutputCollector().reportError(e);
>>>>>
>>>>>                 super.getOutputCollector().fail(tuple);
>>>>>
>>>>>             }
>>>>>
>>>>>         }
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <sr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Pradeep,
>>>>>>
>>>>>> When you move the message to an error queue, is this happening from
>>>>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>>>>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>>>>> cause the tuple to fail)?
>>>>>>
>>>>>> Storm will ack messages on the spout as long as all edges in the
>>>>>> tuple tree are acked, and the topology message timeout hasn't expired
>>>>>> before this occurs.
>>>>>>
>>>>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt
>>>>>> is the only bolt consuming from the spout, the bolt will receive t0 and
>>>>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples
>>>>>> anchored to t0 (using any of the https://storm.apache.org/relea
>>>>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html
>>>>>> methods that take a Tuple anchor), the downstream bolts must ack those
>>>>>> exactly once too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0.
>>>>>> The root tuple on the spout is only acked if t0, t0_0 and t0_1 are acked
>>>>>> once each, and they all get acked before the message timeout elapses.
>>>>>>
>>>>>> Depending on your throughput this may be infeasible, but you might
>>>>>> try enabling debug logging https://storm.apache.org/relea
>>>>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
>>>>>> which will let you tell whether the tuple is being acked on the spout.
>>>>>>
>>>>>> If the tuple is being acked on the spout, you might want to look at
>>>>>> some of the logs from this method https://github.com/apache/stor
>>>>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>>>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They
>>>>>> should show you what the spout is doing internally. Keep in mind that the
>>>>>> spout can only commit e.g. offset 10 if offsets 0-9 have all been
>>>>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>>>>> when you restart, that could also cause this.
>>>>>>
>>>>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>>>>> sreekumar.pradeep@gmail.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>>>>> server is 0.10.1.1.
>>>>>>>
>>>>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>>>>
>>>>>>> Message flow is like below and its a normal topology
>>>>>>>
>>>>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>>>>
>>>>>>> If the message fails avro deserialization , i am moving the message
>>>>>>> to a error queue and acknowledging from the avro bolt . This message is not
>>>>>>> emitted to database bolt .
>>>>>>>
>>>>>>> But its observed that after i restart topology , offset for the
>>>>>>> topic is going back to old offset.
>>>>>>>
>>>>>>> Will Kafka commit the offset, only if the message is acked from all
>>>>>>> bolts ?
>>>>>>>
>>>>>>> Is the offset going back to previous value is beacuse of this ..
>>>>>>>
>>>>>>> Thanks
>>>>>>> Pradeep
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Storm kafka spout offset going back

Posted by pradeep s <sr...@gmail.com>.
Hi Stig,
Max uncommitted offsets is set at 10_000 and retry threshold for exception
handling is 10.
Thanks
Pradeep



On Sat, Sep 2, 2017 at 4:46 AM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Thanks. The bolts look fine to me. I'd look at whether the tuples are
> being acked on the spout (use the debug setting on Config), and the
> OffsetManager class logs I linked earlier. I don't know if it's relevant to
> your case, but please note that there are some cases where setting a low
> maxUncommittedOffsets can cause the spout to stop polling for tuples. It's
> being fixed, but please leave maxUncommittedOffsets at the default if
> you're setting it to a custom value.
>
> What is your retry service configuration?
>
> 2017-09-02 0:11 GMT+02:00 pradeep s <sr...@gmail.com>:
>
>> Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt is
>> not needed.
>>
>> Problem 2 was for a separate topic . Problem 1 was observed for topics
>> where processing failures are encountered previously.
>>
>> I have attached the error processing and bolt files
>>
>> Thanks
>> Pradeep
>>
>>
>>
>>
>> On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <sr...@apache.org>
>> wrote:
>>
>>> Just to make sure I understand:
>>>
>>> This is your topology
>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>>>
>>> The bolt you posted the execute method for is the DataBaseInsertBolt,
>>> right?
>>> What are these statements for if this is the last bolt in the topology? "
>>> super.getOutputCollector().emit(tuple, new Values(fullMessage));"
>>> Are the topics you mention in problem 1 and 2 the same topic?
>>> Essentially what I'm asking is whether the topic that is stuck is also the
>>> one with failures that is starting over on an old offset?
>>> Can you post your RetryService configuration?
>>> You talked about moving tuples to an error queue if they fail
>>> deserialization in the Avro bolt. Can you post that execute too?
>>>
>>> 2017-09-01 20:14 GMT+02:00 pradeep s <sr...@gmail.com>:
>>>
>>>> Thanks  Stig for the response . I can give some more detail on the
>>>> issue we are facing now .
>>>> For any database failure ,we are retrying the tuple for upto 10 times .
>>>> Database failure is mostly because of parent child relation ,since we are
>>>> processing out of order .
>>>> Our consumer group has more than 10 topics and  each topic corresponds
>>>> to one table . For eg: we have topics A, B and C in a group its
>>>> corresponding to tables A,B and C in database .
>>>> In this , table A will the parent and table B and table C will be child
>>>> tables .
>>>> Spout parallelism is set as 50 and each topic has 50 partitions .These
>>>> 50 threads are going round robing across all the topics in the group.
>>>>
>>>> Issues observed with the current setup are
>>>>
>>>> 1)One partition for one topic alone getting stuck .All the other
>>>> partition lag is cleared
>>>>
>>>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>>>
>>>>
>>>> DB Bolt Execute Method below
>>>> =======================
>>>> exceptionCount will have a value greater than 0 once the message is
>>>> moved to error queue . In that case i am acknowleding the message . Other
>>>> cases i am calling tuple.fail.
>>>> There is no downstream bolt for this . This is the final bolt in the
>>>> topology.
>>>>
>>>>  @Override
>>>>
>>>>     public void execute(final Tuple tuple) {
>>>>
>>>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>>>> D_MESSAGE);
>>>>
>>>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>>>> GG_MESSAGE);
>>>>
>>>>         try {
>>>>
>>>>             // Call to handler for generating Sql
>>>>
>>>>             Date date = new Date();
>>>>
>>>>             super.getMessageHandler().handleMessage(ggMessage, super
>>>> .getGenericMessageDAO());
>>>>
>>>>             super.getOutputCollector().emit(tuple, new Values(
>>>> fullMessage));
>>>>
>>>>             super.getOutputCollector().ack(tuple);
>>>>
>>>>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
>>>> Date().getTime() - date.getTime());
>>>>
>>>>         } catch (Exception e) {
>>>>
>>>>             LOGGER.error("DB bolt exception occurred from Aurora : ", e
>>>> );
>>>>
>>>>             int exceptionCount = handleException(fullMessage, ggMessage,
>>>> e, isNormalProcessing);
>>>>
>>>>             if (exceptionCount != -1) {
>>>>
>>>>                 // If message write is success acknowledge the message
>>>> so
>>>>
>>>>                 // that it will be removed from kafka queue
>>>>
>>>>                 super.getOutputCollector().emit(tuple, new Values(
>>>> fullMessage));
>>>>
>>>>                 super.getOutputCollector().ack(tuple);
>>>>
>>>>             } else {
>>>>
>>>>                 super.getOutputCollector().reportError(e);
>>>>
>>>>                 super.getOutputCollector().fail(tuple);
>>>>
>>>>             }
>>>>
>>>>         }
>>>>
>>>>     }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <sr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Pradeep,
>>>>>
>>>>> When you move the message to an error queue, is this happening from
>>>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>>>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>>>> cause the tuple to fail)?
>>>>>
>>>>> Storm will ack messages on the spout as long as all edges in the tuple
>>>>> tree are acked, and the topology message timeout hasn't expired before this
>>>>> occurs.
>>>>>
>>>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt
>>>>> is the only bolt consuming from the spout, the bolt will receive t0 and
>>>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples
>>>>> anchored to t0 (using any of the https://storm.apache.org/relea
>>>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>>>>> that take a Tuple anchor), the downstream bolts must ack those exactly once
>>>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>>>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each,
>>>>> and they all get acked before the message timeout elapses.
>>>>>
>>>>> Depending on your throughput this may be infeasible, but you might try
>>>>> enabling debug logging https://storm.apache.org/relea
>>>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
>>>>> which will let you tell whether the tuple is being acked on the spout.
>>>>>
>>>>> If the tuple is being acked on the spout, you might want to look at
>>>>> some of the logs from this method https://github.com/apache/stor
>>>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should
>>>>> show you what the spout is doing internally. Keep in mind that the spout
>>>>> can only commit e.g. offset 10 if offsets 0-9 have all been
>>>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>>>> when you restart, that could also cause this.
>>>>>
>>>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>>>> sreekumar.pradeep@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>>>> server is 0.10.1.1.
>>>>>>
>>>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>>>
>>>>>> Message flow is like below and its a normal topology
>>>>>>
>>>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>>>
>>>>>> If the message fails avro deserialization , i am moving the message
>>>>>> to a error queue and acknowledging from the avro bolt . This message is not
>>>>>> emitted to database bolt .
>>>>>>
>>>>>> But its observed that after i restart topology , offset for the topic
>>>>>> is going back to old offset.
>>>>>>
>>>>>> Will Kafka commit the offset, only if the message is acked from all
>>>>>> bolts ?
>>>>>>
>>>>>> Is the offset going back to previous value is beacuse of this ..
>>>>>>
>>>>>> Thanks
>>>>>> Pradeep
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Storm kafka spout offset going back

Posted by Stig Rohde Døssing <sr...@apache.org>.
Thanks. The bolts look fine to me. I'd look at whether the tuples are being
acked on the spout (use the debug setting on Config), and the OffsetManager
class logs I linked earlier. I don't know if it's relevant to your case,
but please note that there are some cases where setting a low
maxUncommittedOffsets can cause the spout to stop polling for tuples. It's
being fixed, but please leave maxUncommittedOffsets at the default if
you're setting it to a custom value.

What is your retry service configuration?

2017-09-02 0:11 GMT+02:00 pradeep s <sr...@gmail.com>:

> Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt is
> not needed.
>
> Problem 2 was for a separate topic . Problem 1 was observed for topics
> where processing failures are encountered previously.
>
> I have attached the error processing and bolt files
>
> Thanks
> Pradeep
>
>
>
>
> On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> Just to make sure I understand:
>>
>> This is your topology
>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>>
>> The bolt you posted the execute method for is the DataBaseInsertBolt,
>> right?
>> What are these statements for if this is the last bolt in the topology? "
>> super.getOutputCollector().emit(tuple, new Values(fullMessage));"
>> Are the topics you mention in problem 1 and 2 the same topic? Essentially
>> what I'm asking is whether the topic that is stuck is also the one with
>> failures that is starting over on an old offset?
>> Can you post your RetryService configuration?
>> You talked about moving tuples to an error queue if they fail
>> deserialization in the Avro bolt. Can you post that execute too?
>>
>> 2017-09-01 20:14 GMT+02:00 pradeep s <sr...@gmail.com>:
>>
>>> Thanks  Stig for the response . I can give some more detail on the issue
>>> we are facing now .
>>> For any database failure ,we are retrying the tuple for upto 10 times .
>>> Database failure is mostly because of parent child relation ,since we are
>>> processing out of order .
>>> Our consumer group has more than 10 topics and  each topic corresponds
>>> to one table . For eg: we have topics A, B and C in a group its
>>> corresponding to tables A,B and C in database .
>>> In this , table A will the parent and table B and table C will be child
>>> tables .
>>> Spout parallelism is set as 50 and each topic has 50 partitions .These
>>> 50 threads are going round robing across all the topics in the group.
>>>
>>> Issues observed with the current setup are
>>>
>>> 1)One partition for one topic alone getting stuck .All the other
>>> partition lag is cleared
>>>
>>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>>
>>>
>>> DB Bolt Execute Method below
>>> =======================
>>> exceptionCount will have a value greater than 0 once the message is
>>> moved to error queue . In that case i am acknowleding the message . Other
>>> cases i am calling tuple.fail.
>>> There is no downstream bolt for this . This is the final bolt in the
>>> topology.
>>>
>>>  @Override
>>>
>>>     public void execute(final Tuple tuple) {
>>>
>>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>>> D_MESSAGE);
>>>
>>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>>> GG_MESSAGE);
>>>
>>>         try {
>>>
>>>             // Call to handler for generating Sql
>>>
>>>             Date date = new Date();
>>>
>>>             super.getMessageHandler().handleMessage(ggMessage, super
>>> .getGenericMessageDAO());
>>>
>>>             super.getOutputCollector().emit(tuple, new Values(
>>> fullMessage));
>>>
>>>             super.getOutputCollector().ack(tuple);
>>>
>>>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
>>> Date().getTime() - date.getTime());
>>>
>>>         } catch (Exception e) {
>>>
>>>             LOGGER.error("DB bolt exception occurred from Aurora : ", e
>>> );
>>>
>>>             int exceptionCount = handleException(fullMessage, ggMessage,
>>> e, isNormalProcessing);
>>>
>>>             if (exceptionCount != -1) {
>>>
>>>                 // If message write is success acknowledge the message
>>> so
>>>
>>>                 // that it will be removed from kafka queue
>>>
>>>                 super.getOutputCollector().emit(tuple, new Values(
>>> fullMessage));
>>>
>>>                 super.getOutputCollector().ack(tuple);
>>>
>>>             } else {
>>>
>>>                 super.getOutputCollector().reportError(e);
>>>
>>>                 super.getOutputCollector().fail(tuple);
>>>
>>>             }
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <sr...@apache.org>
>>> wrote:
>>>
>>>> Hi Pradeep,
>>>>
>>>> When you move the message to an error queue, is this happening from
>>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>>> cause the tuple to fail)?
>>>>
>>>> Storm will ack messages on the spout as long as all edges in the tuple
>>>> tree are acked, and the topology message timeout hasn't expired before this
>>>> occurs.
>>>>
>>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt
>>>> is the only bolt consuming from the spout, the bolt will receive t0 and
>>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples
>>>> anchored to t0 (using any of the https://storm.apache.org/relea
>>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>>>> that take a Tuple anchor), the downstream bolts must ack those exactly once
>>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each,
>>>> and they all get acked before the message timeout elapses.
>>>>
>>>> Depending on your throughput this may be infeasible, but you might try
>>>> enabling debug logging https://storm.apache.org/relea
>>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
>>>> which will let you tell whether the tuple is being acked on the spout.
>>>>
>>>> If the tuple is being acked on the spout, you might want to look at
>>>> some of the logs from this method https://github.com/apache/stor
>>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should
>>>> show you what the spout is doing internally. Keep in mind that the spout
>>>> can only commit e.g. offset 10 if offsets 0-9 have all been
>>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>>> when you restart, that could also cause this.
>>>>
>>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>>> sreekumar.pradeep@gmail.com>:
>>>>
>>>>> Hi,
>>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>>> server is 0.10.1.1.
>>>>>
>>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>>
>>>>> Message flow is like below and its a normal topology
>>>>>
>>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>>
>>>>> If the message fails avro deserialization , i am moving the message to
>>>>> a error queue and acknowledging from the avro bolt . This message is not
>>>>> emitted to database bolt .
>>>>>
>>>>> But its observed that after i restart topology , offset for the topic
>>>>> is going back to old offset.
>>>>>
>>>>> Will Kafka commit the offset, only if the message is acked from all
>>>>> bolts ?
>>>>>
>>>>> Is the offset going back to previous value is beacuse of this ..
>>>>>
>>>>> Thanks
>>>>> Pradeep
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Storm kafka spout offset going back

Posted by pradeep s <sr...@gmail.com>.
Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt is
not needed.

Problem 2 was for a separate topic . Problem 1 was observed for topics
where processing failures are encountered previously.

I have attached the error processing and bolt files

Thanks
Pradeep




On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Just to make sure I understand:
>
> This is your topology
> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>
> The bolt you posted the execute method for is the DataBaseInsertBolt,
> right?
> What are these statements for if this is the last bolt in the topology? "
> super.getOutputCollector().emit(tuple, new Values(fullMessage));"
> Are the topics you mention in problem 1 and 2 the same topic? Essentially
> what I'm asking is whether the topic that is stuck is also the one with
> failures that is starting over on an old offset?
> Can you post your RetryService configuration?
> You talked about moving tuples to an error queue if they fail
> deserialization in the Avro bolt. Can you post that execute too?
>
> 2017-09-01 20:14 GMT+02:00 pradeep s <sr...@gmail.com>:
>
>> Thanks  Stig for the response . I can give some more detail on the issue
>> we are facing now .
>> For any database failure ,we are retrying the tuple for upto 10 times .
>> Database failure is mostly because of parent child relation ,since we are
>> processing out of order .
>> Our consumer group has more than 10 topics and  each topic corresponds to
>> one table . For eg: we have topics A, B and C in a group its corresponding
>> to tables A,B and C in database .
>> In this , table A will the parent and table B and table C will be child
>> tables .
>> Spout parallelism is set as 50 and each topic has 50 partitions .These 50
>> threads are going round robing across all the topics in the group.
>>
>> Issues observed with the current setup are
>>
>> 1)One partition for one topic alone getting stuck .All the other
>> partition lag is cleared
>>
>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>
>>
>> DB Bolt Execute Method below
>> =======================
>> exceptionCount will have a value greater than 0 once the message is moved
>> to error queue . In that case i am acknowleding the message . Other cases i
>> am calling tuple.fail.
>> There is no downstream bolt for this . This is the final bolt in the
>> topology.
>>
>>  @Override
>>
>>     public void execute(final Tuple tuple) {
>>
>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>> D_MESSAGE);
>>
>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>> GG_MESSAGE);
>>
>>         try {
>>
>>             // Call to handler for generating Sql
>>
>>             Date date = new Date();
>>
>>             super.getMessageHandler().handleMessage(ggMessage, super
>> .getGenericMessageDAO());
>>
>>             super.getOutputCollector().emit(tuple, new Values(fullMessage
>> ));
>>
>>             super.getOutputCollector().ack(tuple);
>>
>>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
>> Date().getTime() - date.getTime());
>>
>>         } catch (Exception e) {
>>
>>             LOGGER.error("DB bolt exception occurred from Aurora : ", e);
>>
>>             int exceptionCount = handleException(fullMessage, ggMessage,
>> e, isNormalProcessing);
>>
>>             if (exceptionCount != -1) {
>>
>>                 // If message write is success acknowledge the message so
>>
>>                 // that it will be removed from kafka queue
>>
>>                 super.getOutputCollector().emit(tuple, new Values(
>> fullMessage));
>>
>>                 super.getOutputCollector().ack(tuple);
>>
>>             } else {
>>
>>                 super.getOutputCollector().reportError(e);
>>
>>                 super.getOutputCollector().fail(tuple);
>>
>>             }
>>
>>         }
>>
>>     }
>>
>>
>>
>>
>>
>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <sr...@apache.org>
>> wrote:
>>
>>> Hi Pradeep,
>>>
>>> When you move the message to an error queue, is this happening from
>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>> cause the tuple to fail)?
>>>
>>> Storm will ack messages on the spout as long as all edges in the tuple
>>> tree are acked, and the topology message timeout hasn't expired before this
>>> occurs.
>>>
>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt is
>>> the only bolt consuming from the spout, the bolt will receive t0 and must
>>> ack it exactly once. If the AvroDeserializerBolt emits any tuples anchored
>>> to t0 (using any of the https://storm.apache.org/relea
>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>>> that take a Tuple anchor), the downstream bolts must ack those exactly once
>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each,
>>> and they all get acked before the message timeout elapses.
>>>
>>> Depending on your throughput this may be infeasible, but you might try
>>> enabling debug logging https://storm.apache.org/relea
>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean- which
>>> will let you tell whether the tuple is being acked on the spout.
>>>
>>> If the tuple is being acked on the spout, you might want to look at some
>>> of the logs from this method https://github.com/apache/stor
>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should
>>> show you what the spout is doing internally. Keep in mind that the spout
>>> can only commit e.g. offset 10 if offsets 0-9 have all been
>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>> when you restart, that could also cause this.
>>>
>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>> sreekumar.pradeep@gmail.com>:
>>>
>>>> Hi,
>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>> server is 0.10.1.1.
>>>>
>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>
>>>> Message flow is like below and its a normal topology
>>>>
>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>
>>>> If the message fails avro deserialization , i am moving the message to
>>>> a error queue and acknowledging from the avro bolt . This message is not
>>>> emitted to database bolt .
>>>>
>>>> But its observed that after i restart topology , offset for the topic
>>>> is going back to old offset.
>>>>
>>>> Will Kafka commit the offset, only if the message is acked from all
>>>> bolts ?
>>>>
>>>> Is the offset going back to previous value is beacuse of this ..
>>>>
>>>> Thanks
>>>> Pradeep
>>>>
>>>
>>>
>>
>

Re: Storm kafka spout offset going back

Posted by Stig Rohde Døssing <sr...@apache.org>.
Just to make sure I understand:

This is your topology
KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt

The bolt you posted the execute method for is the DataBaseInsertBolt, right?
What are these statements for if this is the last bolt in the topology? "
super.getOutputCollector().emit(tuple, new Values(fullMessage));"
Are the topics you mention in problem 1 and 2 the same topic? Essentially
what I'm asking is whether the topic that is stuck is also the one with
failures that is starting over on an old offset?
Can you post your RetryService configuration?
You talked about moving tuples to an error queue if they fail
deserialization in the Avro bolt. Can you post that execute too?

2017-09-01 20:14 GMT+02:00 pradeep s <sr...@gmail.com>:

> Thanks  Stig for the response . I can give some more detail on the issue
> we are facing now .
> For any database failure ,we are retrying the tuple for upto 10 times .
> Database failure is mostly because of parent child relation ,since we are
> processing out of order .
> Our consumer group has more than 10 topics and  each topic corresponds to
> one table . For eg: we have topics A, B and C in a group its corresponding
> to tables A,B and C in database .
> In this , table A will the parent and table B and table C will be child
> tables .
> Spout parallelism is set as 50 and each topic has 50 partitions .These 50
> threads are going round robing across all the topics in the group.
>
> Issues observed with the current setup are
>
> 1)One partition for one topic alone getting stuck .All the other partition
> lag is cleared
>
> 2)Whatever topic had failures earlier ,is going to a old offset .
>
>
> DB Bolt Execute Method below
> =======================
> exceptionCount will have a value greater than 0 once the message is moved
> to error queue . In that case i am acknowleding the message . Other cases i
> am calling tuple.fail.
> There is no downstream bolt for this . This is the final bolt in the
> topology.
>
>  @Override
>
>     public void execute(final Tuple tuple) {
>
>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
> D_MESSAGE);
>
>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(GG_
> MESSAGE);
>
>         try {
>
>             // Call to handler for generating Sql
>
>             Date date = new Date();
>
>             super.getMessageHandler().handleMessage(ggMessage, super
> .getGenericMessageDAO());
>
>             super.getOutputCollector().emit(tuple, new Values(fullMessage
> ));
>
>             super.getOutputCollector().ack(tuple);
>
>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
> Date().getTime() - date.getTime());
>
>         } catch (Exception e) {
>
>             LOGGER.error("DB bolt exception occurred from Aurora : ", e);
>
>             int exceptionCount = handleException(fullMessage, ggMessage, e,
> isNormalProcessing);
>
>             if (exceptionCount != -1) {
>
>                 // If message write is success acknowledge the message so
>
>                 // that it will be removed from kafka queue
>
>                 super.getOutputCollector().emit(tuple, new Values(
> fullMessage));
>
>                 super.getOutputCollector().ack(tuple);
>
>             } else {
>
>                 super.getOutputCollector().reportError(e);
>
>                 super.getOutputCollector().fail(tuple);
>
>             }
>
>         }
>
>     }
>
>
>
>
>
> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> Hi Pradeep,
>>
>> When you move the message to an error queue, is this happening from
>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>> tuple is being acked in the Avro bolt exactly once (double acking will
>> cause the tuple to fail)?
>>
>> Storm will ack messages on the spout as long as all edges in the tuple
>> tree are acked, and the topology message timeout hasn't expired before this
>> occurs.
>>
>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt is
>> the only bolt consuming from the spout, the bolt will receive t0 and must
>> ack it exactly once. If the AvroDeserializerBolt emits any tuples anchored
>> to t0 (using any of the https://storm.apache.org/relea
>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>> that take a Tuple anchor), the downstream bolts must ack those exactly once
>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each,
>> and they all get acked before the message timeout elapses.
>>
>> Depending on your throughput this may be infeasible, but you might try
>> enabling debug logging https://storm.apache.org/relea
>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean- which
>> will let you tell whether the tuple is being acked on the spout.
>>
>> If the tuple is being acked on the spout, you might want to look at some
>> of the logs from this method https://github.com/apache/stor
>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/
>> org/apache/storm/kafka/spout/internal/OffsetManager.java#L64. They
>> should show you what the spout is doing internally. Keep in mind that the
>> spout can only commit e.g. offset 10 if offsets 0-9 have all been
>> acked/committed, so if an earlier tuple failed and is waiting for retry
>> when you restart, that could also cause this.
>>
>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>> sreekumar.pradeep@gmail.com>:
>>
>>> Hi,
>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>> server is 0.10.1.1.
>>>
>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>
>>> Message flow is like below and its a normal topology
>>>
>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>
>>> If the message fails avro deserialization , i am moving the message to a
>>> error queue and acknowledging from the avro bolt . This message is not
>>> emitted to database bolt .
>>>
>>> But its observed that after i restart topology , offset for the topic is
>>> going back to old offset.
>>>
>>> Will Kafka commit the offset, only if the message is acked from all
>>> bolts ?
>>>
>>> Is the offset going back to previous value is beacuse of this ..
>>>
>>> Thanks
>>> Pradeep
>>>
>>
>>
>

Re: Storm kafka spout offset going back

Posted by pradeep s <sr...@gmail.com>.
Thanks  Stig for the response . I can give some more detail on the issue we
are facing now .
For any database failure ,we are retrying the tuple for upto 10 times .
Database failure is mostly because of parent child relation ,since we are
processing out of order .
Our consumer group has more than 10 topics and  each topic corresponds to
one table . For eg: we have topics A, B and C in a group its corresponding
to tables A,B and C in database .
In this , table A will the parent and table B and table C will be child
tables .
Spout parallelism is set as 50 and each topic has 50 partitions .These 50
threads are going round robing across all the topics in the group.

Issues observed with the current setup are

1)One partition for one topic alone getting stuck .All the other partition
lag is cleared

2)Whatever topic had failures earlier ,is going to a old offset .


DB Bolt Execute Method below
=======================
exceptionCount will have a value greater than 0 once the message is moved
to error queue . In that case i am acknowleding the message . Other cases i
am calling tuple.fail.
There is no downstream bolt for this . This is the final bolt in the
topology.

 @Override

    public void execute(final Tuple tuple) {

        String fullMessage = (String) tuple.getValueByField(
EXTRACTED_MESSAGE);

        GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
GG_MESSAGE);

        try {

            // Call to handler for generating Sql

            Date date = new Date();

            super.getMessageHandler().handleMessage(ggMessage, super
.getGenericMessageDAO());

            super.getOutputCollector().emit(tuple, new Values(fullMessage));

            super.getOutputCollector().ack(tuple);

            LOGGER.info("DbActionBolt Ack time in ms: {}", new
Date().getTime() - date.getTime());

        } catch (Exception e) {

            LOGGER.error("DB bolt exception occurred from Aurora : ", e);

            int exceptionCount = handleException(fullMessage, ggMessage, e,
isNormalProcessing);

            if (exceptionCount != -1) {

                // If message write is success acknowledge the message so

                // that it will be removed from kafka queue

                super.getOutputCollector().emit(tuple, new Values(
fullMessage));

                super.getOutputCollector().ack(tuple);

            } else {

                super.getOutputCollector().reportError(e);

                super.getOutputCollector().fail(tuple);

            }

        }

    }





On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Hi Pradeep,
>
> When you move the message to an error queue, is this happening from inside
> the Avro bolt or are you emitting a tuple? Can you verify that the tuple is
> being acked in the Avro bolt exactly once (double acking will cause the
> tuple to fail)?
>
> Storm will ack messages on the spout as long as all edges in the tuple
> tree are acked, and the topology message timeout hasn't expired before this
> occurs.
>
> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt is
> the only bolt consuming from the spout, the bolt will receive t0 and must
> ack it exactly once. If the AvroDeserializerBolt emits any tuples anchored
> to t0 (using any of the https://storm.apache.org/
> releases/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html
> methods that take a Tuple anchor), the downstream bolts must ack those
> exactly once too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0.
> The root tuple on the spout is only acked if t0, t0_0 and t0_1 are acked
> once each, and they all get acked before the message timeout elapses.
>
> Depending on your throughput this may be infeasible, but you might try
> enabling debug logging https://storm.apache.org/
> releases/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
> which will let you tell whether the tuple is being acked on the spout.
>
> If the tuple is being acked on the spout, you might want to look at some
> of the logs from this method https://github.com/apache/
> storm/blob/v1.1.0/external/storm-kafka-client/src/main/
> java/org/apache/storm/kafka/spout/internal/OffsetManager.java#L64. They
> should show you what the spout is doing internally. Keep in mind that the
> spout can only commit e.g. offset 10 if offsets 0-9 have all been
> acked/committed, so if an earlier tuple failed and is waiting for retry
> when you restart, that could also cause this.
>
> 2017-09-01 7:04 GMT+02:00 pradeep s <sr...@gmail.com>:
>
>> Hi,
>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka server
>> is 0.10.1.1.
>>
>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>
>> Message flow is like below and its a normal topology
>>
>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>
>> If the message fails avro deserialization , i am moving the message to a
>> error queue and acknowledging from the avro bolt . This message is not
>> emitted to database bolt .
>>
>> But its observed that after i restart topology , offset for the topic is
>> going back to old offset.
>>
>> Will Kafka commit the offset, only if the message is acked from all bolts
>> ?
>>
>> Is the offset going back to previous value is beacuse of this ..
>>
>> Thanks
>> Pradeep
>>
>
>

Re: Storm kafka spout offset going back

Posted by Stig Rohde Døssing <sr...@apache.org>.
Hi Pradeep,

When you move the message to an error queue, is this happening from inside
the Avro bolt or are you emitting a tuple? Can you verify that the tuple is
being acked in the Avro bolt exactly once (double acking will cause the
tuple to fail)?

Storm will ack messages on the spout as long as all edges in the tuple tree
are acked, and the topology message timeout hasn't expired before this
occurs.

For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt is
the only bolt consuming from the spout, the bolt will receive t0 and must
ack it exactly once. If the AvroDeserializerBolt emits any tuples anchored
to t0 (using any of the
https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html
methods that take a Tuple anchor), the downstream bolts must ack those
exactly once too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0.
The root tuple on the spout is only acked if t0, t0_0 and t0_1 are acked
once each, and they all get acked before the message timeout elapses.

Depending on your throughput this may be infeasible, but you might try
enabling debug logging
https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
which will let you tell whether the tuple is being acked on the spout.

If the tuple is being acked on the spout, you might want to look at some of
the logs from this method
https://github.com/apache/storm/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java#L64.
They should show you what the spout is doing internally. Keep in mind that
the spout can only commit e.g. offset 10 if offsets 0-9 have all been
acked/committed, so if an earlier tuple failed and is waiting for retry
when you restart, that could also cause this.

2017-09-01 7:04 GMT+02:00 pradeep s <sr...@gmail.com>:

> Hi,
> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka server
> is 0.10.1.1.
>
> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>
> Message flow is like below and its a normal topology
>
> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>
> If the message fails avro deserialization , i am moving the message to a
> error queue and acknowledging from the avro bolt . This message is not
> emitted to database bolt .
>
> But its observed that after i restart topology , offset for the topic is
> going back to old offset.
>
> Will Kafka commit the offset, only if the message is acked from all bolts ?
>
> Is the offset going back to previous value is beacuse of this ..
>
> Thanks
> Pradeep
>