You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Yair Weinberger (JIRA)" <ji...@apache.org> on 2014/11/30 09:37:12 UTC

[jira] [Updated] (STORM-579) Trident KafkaSpout throws RunTimeException when trying to re-emit a batch that is no longer in Kafka

     [ https://issues.apache.org/jira/browse/STORM-579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yair Weinberger updated STORM-579:
----------------------------------
    Description: 
On version 0.9.2 - A message from the future is transmitted once, then a RuntimeException is thrown from the KafkaSpout
On version 0.9.3 it seems that UpdateOffsetException is thrown, but it seems that this will still cause the topology to be killed.

There is some faulty code that is causing a batch to be retransmitted to infinity.
Kafka Spout re-emits the batch, and as intended behaviour, has no limit on how many times it will be re-emitted (which is OK).
At some point in the future, the offset of this batch no longer exists on Kafka.
Then the real action kicks in (code snippets are taken from the v0.9.2 tag) - Kafka Spout is using KafkaUtils.fetchMessages to get the batch from Kafka.
Now let us have a look at the relevant code from fetchMessages

if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
  long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
  LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
  "retrying with default start offset time from configuration. " +
  "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
  offset = startOffset;
}

So if the offset does not exist of Kafka anymore, we will fetch something with a different offset (not sure why this is a good idea). In practice, this will be much larger offset that originally tried to retrieve.
Now let us go back to the Kafka Spout code, now that it got some messages with a much larger offset than what it originally requested, the behaviour is really interesting.
for (MessageAndOffset msg : msgs) {
  if (offset == nextOffset) {
    break;
  }
if (offset > nextOffset) {
  throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
}
  emit(collector, msg.message());
  offset = msg.nextOffset();
}
As you can see, at first, nothing touches the offset, so some random message from a different offset *is emitted*
Then, offset will be updated with nextOffset of current message which is of course is very large, which in the next entry to the loop will cause the "overshot the end offset" error.

  was:
There is some faulty code that is causing a batch to be retransmitted to infinity.
Kafka Spout re-emits the batch, and as intended behaviour, has no limit on how many times it will be re-emitted (which is OK).
At some point in the future, the offset of this batch no longer exists on Kafka.
Then the real action kicks in (code snippets are taken from the v0.9.2 tag) - Kafka Spout is using KafkaUtils.fetchMessages to get the batch from Kafka.
Now let us have a look at the relevant code from fetchMessages

if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
  long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
  LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
  "retrying with default start offset time from configuration. " +
  "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
  offset = startOffset;
}

So if the offset does not exist of Kafka anymore, we will fetch something with a different offset (not sure why this is a good idea). In practice, this will be much larger offset that originally tried to retrieve.
Now let us go back to the Kafka Spout code, now that it got some messages with a much larger offset than what it originally requested, the behaviour is really interesting.
for (MessageAndOffset msg : msgs) {
  if (offset == nextOffset) {
    break;
  }
if (offset > nextOffset) {
  throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
}
  emit(collector, msg.message());
  offset = msg.nextOffset();
}
As you can see, at first, nothing touches the offset, so some random message from a different offset *is emitted*
Then, offset will be updated with nextOffset of current message which is of course is very large, which in the next entry to the loop will cause the "overshot the end offset" error.


> Trident KafkaSpout throws RunTimeException when trying to re-emit a batch that is no longer in Kafka
> ----------------------------------------------------------------------------------------------------
>
>                 Key: STORM-579
>                 URL: https://issues.apache.org/jira/browse/STORM-579
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 0.9.2-incubating, 0.9.3
>            Reporter: Yair Weinberger
>
> On version 0.9.2 - A message from the future is transmitted once, then a RuntimeException is thrown from the KafkaSpout
> On version 0.9.3 it seems that UpdateOffsetException is thrown, but it seems that this will still cause the topology to be killed.
> There is some faulty code that is causing a batch to be retransmitted to infinity.
> Kafka Spout re-emits the batch, and as intended behaviour, has no limit on how many times it will be re-emitted (which is OK).
> At some point in the future, the offset of this batch no longer exists on Kafka.
> Then the real action kicks in (code snippets are taken from the v0.9.2 tag) - Kafka Spout is using KafkaUtils.fetchMessages to get the batch from Kafka.
> Now let us have a look at the relevant code from fetchMessages
> if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
>   long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
>   LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
>   "retrying with default start offset time from configuration. " +
>   "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
>   offset = startOffset;
> }
> So if the offset does not exist of Kafka anymore, we will fetch something with a different offset (not sure why this is a good idea). In practice, this will be much larger offset that originally tried to retrieve.
> Now let us go back to the Kafka Spout code, now that it got some messages with a much larger offset than what it originally requested, the behaviour is really interesting.
> for (MessageAndOffset msg : msgs) {
>   if (offset == nextOffset) {
>     break;
>   }
> if (offset > nextOffset) {
>   throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
> }
>   emit(collector, msg.message());
>   offset = msg.nextOffset();
> }
> As you can see, at first, nothing touches the offset, so some random message from a different offset *is emitted*
> Then, offset will be updated with nextOffset of current message which is of course is very large, which in the next entry to the loop will cause the "overshot the end offset" error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)