You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Adrian Landman <ad...@gmail.com> on 2014/07/24 21:27:07 UTC

KafkaSpout offsets

In nathanmarz/storm-contrib project there was a KafkaConfig that had a
forceOffsetTime.  In our code someone had documented that calling this with
different values would affect the offsets in the following way:

-2 Will start at the beginning (earliest message) of the topic
-1 Will start at the end (latest message) of the topic
-3 Will start where the spout left off
And anthing >0 will start at the specified offset.

In the new project external/storm-kafka there is also a KafkaConfig and I
see that it exposes
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = 100000;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;

By default does this mean the spout will start at the beginning of the
topic?  What does the forceFromStart do?  If we want to start from whatever
offset the spout was last processing, is there anyway to do this?

Re: KafkaSpout offsets

Posted by Harsha <st...@harsha.io>.
"Start at the first (oldest) message on the topic: set
forceFromStart = true" Yes

 "Start at the last (newest) message on the topic : ?"

     Current version of kafkaspout doesn't offer this config.
Kafka OffsetRequest Api does provide this option

[1]https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To
+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest

     can you please file a jira for this.

"Start at the last saved offset : Don't change the config
defaults" Yes

"Start at an explicit offset: ? (I don't envision needing to
use this, but just in case)"

   As far as I know there is no api to do this at Kafka it
self.  Here is an approach that talks about changing offsets in
zookeeper

[2]https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Ho
wcanIrewindtheoffsetintheconsumer?  IMO not recommended unless
its very rarely done to reprocess data.



 "public boolean useStartOffsetTimeIfOffsetOutOfRange = true if
an offset is found "

This options exist incase if the user has not read from
KafkaQueue and log.retention.hours elapsed in that case kafka
deleted older data and the zookeeper has older offset which
points to deleted data. if we starts from this offset it will
throw OffsetOutOfRangeException so to work around this scenario
if its throws such exception we starts from the beginning of
the queue.



On Thu, Jul 24, 2014, at 01:08 PM, Adrian Landman wrote:

Thanks!  That helps clear things up some.  So if forceFromStart
is true it will force it to start at the beginning.  If nothing
is changed it will try and start from the last committed
offset, but if there is no committed offset where will it
start?  What if there is a saved offset, but we want to force
it to start at the end?  Or if we want to force a particular
offset, not the last saved one?  I'm guessing that based on
public boolean useStartOffsetTimeIfOffsetOutOfRange = true if
an offset is found that is out of the range, it will start at
the start/beginning offset?

Essentially what I want to be able to specify the following
conditions:
Start at the first (oldest) message on the topic: set
forceFromStart = true
Start at the last (newest) message on the topic : ?
Start at the last saved offset : Don't change the config
defaults
Start at an explicit offset: ? (I don't envision needing to use
this, but just in case)



On Thu, Jul 24, 2014 at 1:40 PM, Harsha <[3...@harsha.io>
wrote:

Hi Adrian,
           If you set forceFromStart to true it calls
KafkaApi.Offset to get the earliest time, which finds the
beginning of the kafka logs and starts the streaming from
there. By default this is set to false and it makes a request
to Kafka to find whats the last committed offset and streams it
from there. You can control how often kafka offset needs to be
committed by using SpoutConfig.stateUpdateIntervalMs by default
its 2000 ms.
-Harsha



On Thu, Jul 24, 2014, at 12:27 PM, Adrian Landman wrote:

In nathanmarz/storm-contrib project there was a KafkaConfig
that had a forceOffsetTime.  In our code someone had documented
that calling this with different values would affect the
offsets in the following way:

-2 Will start at the beginning (earliest message) of the topic
-1 Will start at the end (latest message) of the topic
-3 Will start where the spout left off
And anthing >0 will start at the specified offset.

In the new project external/storm-kafka there is also a
KafkaConfig and I see that it exposes
public boolean forceFromStart = false;
public long startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = 100000;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;

By default does this mean the spout will start at the beginning
of the topic?  What does the forceFromStart do?  If we want to
start from whatever offset the spout was last processing, is
there anyway to do this?

References

1. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
2. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowcanIrewindtheoffsetintheconsumer
3. mailto:storm@harsha.io

Re: KafkaSpout offsets

Posted by Adrian Landman <ad...@gmail.com>.
Thanks!  That helps clear things up some.  So if forceFromStart is true it
will force it to start at the beginning.  If nothing is changed it will try
and start from the last committed offset, but if there is no committed
offset where will it start?  What if there is a saved offset, but we want
to force it to start at the end?  Or if we want to force a particular
offset, not the last saved one?  I'm guessing that based on public boolean
useStartOffsetTimeIfOffsetOutOfRange = true if an offset is found that is
out of the range, it will start at the start/beginning offset?

Essentially what I want to be able to specify the following conditions:
Start at the first (oldest) message on the topic: set forceFromStart = true
Start at the last (newest) message on the topic : ?
Start at the last saved offset : Don't change the config defaults
Start at an explicit offset: ? (I don't envision needing to use this, but
just in case)



On Thu, Jul 24, 2014 at 1:40 PM, Harsha <st...@harsha.io> wrote:

>  Hi Adrian,
>            If you set forceFromStart to true it calls KafkaApi.Offset to
> get the earliest time, which finds the beginning of the kafka logs and
> starts the streaming from there. By default this is set to false and it
> makes a request to Kafka to find whats the last committed offset and
> streams it from there. You can control how often kafka offset needs to be
> committed by using SpoutConfig.stateUpdateIntervalMs by default its 2000 ms.
> -Harsha
>
>
>
> On Thu, Jul 24, 2014, at 12:27 PM, Adrian Landman wrote:
>
> In nathanmarz/storm-contrib project there was a KafkaConfig that had a
> forceOffsetTime.  In our code someone had documented that calling this with
> different values would affect the offsets in the following way:
>
> -2 Will start at the beginning (earliest message) of the topic
> -1 Will start at the end (latest message) of the topic
> -3 Will start where the spout left off
> And anthing >0 will start at the specified offset.
>
> In the new project external/storm-kafka there is also a KafkaConfig and I
> see that it exposes
> public boolean forceFromStart = false;
> public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
> public long maxOffsetBehind = 100000;
> public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
>
> By default does this mean the spout will start at the beginning of the
> topic?  What does the forceFromStart do?  If we want to start from whatever
> offset the spout was last processing, is there anyway to do this?
>
>
>

Re: KafkaSpout offsets

Posted by Harsha <st...@harsha.io>.
Hi Adrian,

           If you set forceFromStart to true it calls
KafkaApi.Offset to get the earliest time, which finds the
beginning of the kafka logs and starts the streaming from
there. By default this is set to false and it makes a request
to Kafka to find whats the last committed offset and streams it
from there. You can control how often kafka offset needs to be
committed by using SpoutConfig.stateUpdateIntervalMs by default
its 2000 ms.

-Harsha







On Thu, Jul 24, 2014, at 12:27 PM, Adrian Landman wrote:

In nathanmarz/storm-contrib project there was a KafkaConfig
that had a forceOffsetTime.  In our code someone had documented
that calling this with different values would affect the
offsets in the following way:

-2 Will start at the beginning (earliest message) of the topic
-1 Will start at the end (latest message) of the topic
-3 Will start where the spout left off
And anthing >0 will start at the specified offset.

In the new project external/storm-kafka there is also a
KafkaConfig and I see that it exposes
public boolean forceFromStart = false;
public long startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = 100000;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;

By default does this mean the spout will start at the beginning
of the topic?  What does the forceFromStart do?  If we want to
start from whatever offset the spout was last processing, is
there anyway to do this?