You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Yair Weinberger <ya...@gmail.com> on 2014/11/27 09:04:18 UTC

TransactionalTridentKafkaSpout overshot end offset

Following Evan Spark's question from July, we also encountered this issue.
At some point in time, the spout worker crashes with the below error in the
log.
Any ideas why this happens?
Is it even correct to throw runtime exception here?

2014-11-27 05:38:09 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Error when
re-emitting batch. overshot the end offset
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.daemon.executor$fn__4822$fn__4834$fn__4881.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__455.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_33]
Caused by: java.lang.RuntimeException: Error when re-emitting batch.
overshot the end offset
        at
storm.kafka.trident.TridentKafkaEmitter.reEmitPartitionBatch(TridentKafkaEmitter.java:162)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter.access$500(TridentKafkaEmitter.java:46)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:243)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:226)
~[stormjar.jar:na]
        at
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:133)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.daemon.executor$fn__4822$tuple_action_fn__4824.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.daemon.executor$mk_task_receiver$fn__4745.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.disruptor$clojure_handler$reify__833.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        ... 6 common frames omitted
This seems to be the source of this exception:

/**
     * re-emit the batch described by the meta data provided
     *
     * @param attempt
     * @param collector
     * @param partition
     * @param meta
     */

    private void reEmitPartitionBatch(TransactionAttempt attempt,
TridentCollector collector, Partition partition, Map meta) {
        LOG.info("re-emitting batch, attempt " + attempt);
        String instanceId = (String) meta.get("instanceId");
        if (!_config.forceFromStart ||
instanceId.equals(_topologyInstanceId)) {
            SimpleConsumer consumer = _connections.register(partition);
            long offset = (Long) meta.get("offset");
            long nextOffset = (Long) meta.get("nextOffset");
            ByteBufferMessageSet msgs = fetchMessages(consumer, partition,
offset);
            for (MessageAndOffset msg : msgs) {
                if (offset == nextOffset) {
                    break;
                }
                if (offset > nextOffset) {
                    throw new RuntimeException("Error when re-emitting
batch. overshot the end offset");
                }
                emit(collector, msg.message());
                offset = msg.nextOffset();
            }
        }
    }

Re: TransactionalTridentKafkaSpout overshot end offset

Posted by Yair Weinberger <ya...@gmail.com>.
OK, Sorry for this, it was sent in the middle (with our workaround). Please
see below the full post mortem. I do hope it will be of value to the
readers.
Sorry again for the spam.

For the next guy that will search for this error, here is our post-mortem
analysis. I'll follow with opening an issue in JIRA.

*TL;DR*: there is a bug in Kafka Spout, where if it tries to re-emit a
batch that is no longer on Kafka, it will throw a RunTimeException and kill
the entire topology.

Here is the more detailed analysis:

   1. A fault in our code caused a batch to be reemitted over and over
   again to infinity.
   2. Kafka Spout re-emits the batch, and as intended behaviour, has no
   limit on how many times it will be re-emitted (which is OK)
   3. At some point in the future, the offset of this batch no longer
   exists on Kafka.
   4. Then the real action kicks in (code snippets are taken from the
   v0.9.2 tag) - Kafka Spout is using KafkaUtils.fetchMessages to get the
   batch from Kafka.
   5. Now let us have a look at the relevant code from fetchMessages

      if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) &&
   config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
     long startOffset = getOffset(consumer, topic, partitionId,
   config.startOffsetTime);
     LOG.warn("Got fetch request with offset out of range: [" + offset +
   "]; " +
     "retrying with default start offset time from configuration. " +
     "configured start offset time: [" + config.startOffsetTime + "]
   offset: [" + startOffset + "]");
     offset = startOffset;
   }
   6. So if the offset does not exist of Kafka anymore, we will fetch
   something with a different offset (not sure why this is a good idea). In
   practice, this will be much larger offset that originally tried to retrieve.
   7. Now let us go back to the Kafka Spout code, now that it got some
   messages with a much larger offset than what it originally requested, the
   behaviour is really interesting.
   for (MessageAndOffset msg : msgs) {
     if (offset == nextOffset) {
       break;
     }
   if (offset > nextOffset) {
     throw new RuntimeException("Error when re-emitting batch. overshot the
   end offset");
   }
     emit(collector, msg.message());
     offset = msg.nextOffset();
   }
   8. As you can see, at first, nothing touches the offset, so some random
   message from a different offset *is emitted*
   9. Then, offset will be updated with nextOffset of current message which
   is of course is very large, which in the next entry to the loop will cause
   the "overshot the end offset" error.
   10. Our workaround was to add a check in the loop that we are indeed in
   the correct offset (for every message), and break the loop if not.
   if (offset != msg.offset()) {
     /*
     * the offset of the message should always be equals to the nextOffset
   of the previous message (or the starting offset)
     */
     LOG.error(String.format("The offset of the message received (%d) does
   not equal to the expected offset (%d) ", msg.offset(), offset ));
     break;
   }

Yair Weinberger
alooma.io

On Sun, Nov 30, 2014 at 9:58 AM, Yair Weinberger <ya...@gmail.com> wrote:

> For the next guy that will search for this error, here is our post-mortem
> analysis. I'll follow with opening an issue in JIRA.
>
> *TL;DR*: there is a bug in Kafka Spout, where if it tries to re-emit a
> batch that is no longer on Kafka, it will throw a RunTimeException and kill
> the entire topology.
>
> Here is the more detailed analysis:
>
>    1. A fault in our code caused a batch to be reemitted over and over
>    again to infinity.
>    2. Kafka Spout re-emits the batch, and as intended behaviour, has no
>    limit on how many times it will be re-emitted (which is OK)
>    3. At some point in the future, the offset of this batch no longer
>    exists on Kafka.
>    4. Then the real action kicks in (code snippets are taken from the
>    v0.9.2 tag) - Kafka Spout is using KafkaUtils.fetchMessages to get the
>    batch from Kafka.
>    5. Now let us have a look at the relevant code from fetchMessages
>
>       if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) &&
>    config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
>      long startOffset = getOffset(consumer, topic, partitionId,
>    config.startOffsetTime);
>      LOG.warn("Got fetch request with offset out of range: [" + offset +
>    "]; " +
>      "retrying with default start offset time from configuration. " +
>      "configured start offset time: [" + config.startOffsetTime + "]
>    offset: [" + startOffset + "]");
>      offset = startOffset;
>    }
>    6. So if the offset does not exist of Kafka anymore, we will fetch
>    something with a different offset (not sure why this is a good idea). In
>    practice, this will be much larger offset that originally tried to retrieve.
>    7. Now let us go back to the Kafka Spout code, now that it got some
>    messages with a much larger offset than what it originally requested, the
>    behaviour is really interesting.
>    for (MessageAndOffset msg : msgs)
>
>
>    1. if (offset != msg.offset()) {
>    2.
>
>                 /*
>
>                  * the offset of the message should always be equals to
>    the nextOffset of the previous message (or the starting offset)
>
>                  */
>
>                 LOG.error(String.format("The offset of the message
>    received (%d) does not equal to the expected offset (%d) ",
>    msg.offset(), offset ));
>
>                 break;
>
>                }
>
>                    if (offset == nextOffset) {
>
>                        break;
>
>                    }
>
>                    if (offset > nextOffset) {
>
>                        throw new RuntimeException("Error when re-emitting
>    batch. overshot the end offset");
>
>                    }
>
>                    emit(collector, msg.message());
>
>                    offset = msg.nextOffset();
>
>                }
>    3. f
>
>
> On Thu, Nov 27, 2014 at 10:04 AM, Yair Weinberger <ya...@gmail.com>
> wrote:
>
>> Following Evan Spark's question from July, we also encountered this issue.
>> At some point in time, the spout worker crashes with the below error in
>> the log.
>> Any ideas why this happens?
>> Is it even correct to throw runtime exception here?
>>
>> 2014-11-27 05:38:09 b.s.util [ERROR] Async loop died!
>> java.lang.RuntimeException: java.lang.RuntimeException: Error when
>> re-emitting batch. overshot the end offset
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.daemon.executor$fn__4822$fn__4834$fn__4881.invoke(executor.clj:746)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at backtype.storm.util$async_loop$fn__455.invoke(util.clj:431)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
>>         at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_33]
>> Caused by: java.lang.RuntimeException: Error when re-emitting batch.
>> overshot the end offset
>>         at
>> storm.kafka.trident.TridentKafkaEmitter.reEmitPartitionBatch(TridentKafkaEmitter.java:162)
>> ~[stormjar.jar:na]
>>         at
>> storm.kafka.trident.TridentKafkaEmitter.access$500(TridentKafkaEmitter.java:46)
>> ~[stormjar.jar:na]
>>         at
>> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:243)
>> ~[stormjar.jar:na]
>>         at
>> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:226)
>> ~[stormjar.jar:na]
>>         at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:133)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.daemon.executor$fn__4822$tuple_action_fn__4824.invoke(executor.clj:631)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__4745.invoke(executor.clj:399)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.disruptor$clojure_handler$reify__833.onEvent(disruptor.clj:58)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>         ... 6 common frames omitted
>> This seems to be the source of this exception:
>>
>> /**
>>      * re-emit the batch described by the meta data provided
>>      *
>>      * @param attempt
>>      * @param collector
>>      * @param partition
>>      * @param meta
>>      */
>>
>>     private void reEmitPartitionBatch(TransactionAttempt attempt,
>> TridentCollector collector, Partition partition, Map meta) {
>>         LOG.info("re-emitting batch, attempt " + attempt);
>>         String instanceId = (String) meta.get("instanceId");
>>         if (!_config.forceFromStart ||
>> instanceId.equals(_topologyInstanceId)) {
>>             SimpleConsumer consumer = _connections.register(partition);
>>             long offset = (Long) meta.get("offset");
>>             long nextOffset = (Long) meta.get("nextOffset");
>>             ByteBufferMessageSet msgs = fetchMessages(consumer,
>> partition, offset);
>>             for (MessageAndOffset msg : msgs) {
>>                 if (offset == nextOffset) {
>>                     break;
>>                 }
>>                 if (offset > nextOffset) {
>>                     throw new RuntimeException("Error when re-emitting
>> batch. overshot the end offset");
>>                 }
>>                 emit(collector, msg.message());
>>                 offset = msg.nextOffset();
>>             }
>>         }
>>     }
>>
>
>

Re: TransactionalTridentKafkaSpout overshot end offset

Posted by Yair Weinberger <ya...@gmail.com>.
For the next guy that will search for this error, here is our post-mortem
analysis. I'll follow with opening an issue in JIRA.

*TL;DR*: there is a bug in Kafka Spout, where if it tries to re-emit a
batch that is no longer on Kafka, it will throw a RunTimeException and kill
the entire topology.

Here is the more detailed analysis:

   1. A fault in our code caused a batch to be reemitted over and over
   again to infinity.
   2. Kafka Spout re-emits the batch, and as intended behaviour, has no
   limit on how many times it will be re-emitted (which is OK)
   3. At some point in the future, the offset of this batch no longer
   exists on Kafka.
   4. Then the real action kicks in (code snippets are taken from the
   v0.9.2 tag) - Kafka Spout is using KafkaUtils.fetchMessages to get the
   batch from Kafka.
   5. Now let us have a look at the relevant code from fetchMessages

      if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) &&
   config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
     long startOffset = getOffset(consumer, topic, partitionId,
   config.startOffsetTime);
     LOG.warn("Got fetch request with offset out of range: [" + offset +
   "]; " +
     "retrying with default start offset time from configuration. " +
     "configured start offset time: [" + config.startOffsetTime + "]
   offset: [" + startOffset + "]");
     offset = startOffset;
   }
   6. So if the offset does not exist of Kafka anymore, we will fetch
   something with a different offset (not sure why this is a good idea). In
   practice, this will be much larger offset that originally tried to retrieve.
   7. Now let us go back to the Kafka Spout code, now that it got some
   messages with a much larger offset than what it originally requested, the
   behaviour is really interesting.
   for (MessageAndOffset msg : msgs) {
   8.

               if (offset != msg.offset()) {

                /*

                 * the offset of the message should always be equals to the
   nextOffset of the previous message (or the starting offset)

                 */

                LOG.error(String.format("The offset of the message received
   (%d) does not equal to the expected offset (%d) ", msg.offset(), offset
   ));

                break;

               }

                   if (offset == nextOffset) {

                       break;

                   }

                   if (offset > nextOffset) {

                       throw new RuntimeException("Error when re-emitting
   batch. overshot the end offset");

                   }

                   emit(collector, msg.message());

                   offset = msg.nextOffset();

               }
   9. f


On Thu, Nov 27, 2014 at 10:04 AM, Yair Weinberger <ya...@gmail.com>
wrote:

> Following Evan Spark's question from July, we also encountered this issue.
> At some point in time, the spout worker crashes with the below error in
> the log.
> Any ideas why this happens?
> Is it even correct to throw runtime exception here?
>
> 2014-11-27 05:38:09 b.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.RuntimeException: Error when
> re-emitting batch. overshot the end offset
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.daemon.executor$fn__4822$fn__4834$fn__4881.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at backtype.storm.util$async_loop$fn__455.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_33]
> Caused by: java.lang.RuntimeException: Error when re-emitting batch.
> overshot the end offset
>         at
> storm.kafka.trident.TridentKafkaEmitter.reEmitPartitionBatch(TridentKafkaEmitter.java:162)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter.access$500(TridentKafkaEmitter.java:46)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:243)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:226)
> ~[stormjar.jar:na]
>         at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:133)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.daemon.executor$fn__4822$tuple_action_fn__4824.invoke(executor.clj:631)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__4745.invoke(executor.clj:399)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.disruptor$clojure_handler$reify__833.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         ... 6 common frames omitted
> This seems to be the source of this exception:
>
> /**
>      * re-emit the batch described by the meta data provided
>      *
>      * @param attempt
>      * @param collector
>      * @param partition
>      * @param meta
>      */
>
>     private void reEmitPartitionBatch(TransactionAttempt attempt,
> TridentCollector collector, Partition partition, Map meta) {
>         LOG.info("re-emitting batch, attempt " + attempt);
>         String instanceId = (String) meta.get("instanceId");
>         if (!_config.forceFromStart ||
> instanceId.equals(_topologyInstanceId)) {
>             SimpleConsumer consumer = _connections.register(partition);
>             long offset = (Long) meta.get("offset");
>             long nextOffset = (Long) meta.get("nextOffset");
>             ByteBufferMessageSet msgs = fetchMessages(consumer, partition,
> offset);
>             for (MessageAndOffset msg : msgs) {
>                 if (offset == nextOffset) {
>                     break;
>                 }
>                 if (offset > nextOffset) {
>                     throw new RuntimeException("Error when re-emitting
> batch. overshot the end offset");
>                 }
>                 emit(collector, msg.message());
>                 offset = msg.nextOffset();
>             }
>         }
>     }
>