You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Claudio Martins <cl...@mobileaware.com> on 2014/04/17 00:03:04 UTC

starting processing from the last checkpoint

Hi guys,

I'm having a hard time trying to figure out why my Samza job does not start
processing Kafka messages from the last checkpoint.

I have the task configured as

systems.kafka.consumer.auto.offset.reset=smallest
streams.topic-name.consumer.reset.offset=false

However, when the job runs it doesn't start processing anything from the
current offset, just the upcoming messages.

I stop the job, load some messages into the topic and start the job again.
Nothing happens, just the new messages are processed.

Is there anything I am missing here?

I do not want to start processing from the beginning of the topic, just to
make it clear. I want to process from the last checkpoint.


Thanks,

- Claudio Martins
linkedin: www.linkedin.com/in/martinsclaudio

Re: starting processing from the last checkpoint

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Claudio,

The logs show that your input topic is metrics-topic. The container is
reading this topic from the upcoming offset, not the oldest:

2014-04-22 19:46:47 KafkaSystemAdmin$ [INFO] Got metadata:
Map(metrics-topic -> SystemStreamMetadata [streamName=metrics-topic,
partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
[oldestOffset=0, newestOffset=1704, upcomingOffset=1705], Partition
[partition=1]=SystemStreamPartitionMetadata [oldestOffset=0,
newestOffset=1602, upcomingOffset=1603]}])

...
2014-04-22 19:47:20 GetOffset [INFO] Validating offset 1705 for topic and
partition [metrics-topic,0]
2014-04-22 19:47:20 GetOffset [INFO] Able to successfully read from offset
1705 for topic and partition [metrics-topic,0]. Using it to instantiate
consumer.
2014-04-22 19:47:20 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2014-04-22 19:47:20 GetOffset [INFO] Validating offset 1603 for topic and
partition [metrics-topic,1]
2014-04-22 19:47:20 GetOffset [INFO] Able to successfully read from offset
1603 for topic and partition [metrics-topic,1]. Using it to instantiate
consumer.
...


The reason lies in this log line:


2014-04-22 19:46:47 OffsetManager$ [INFO] No default offset for
SystemStream [system=kafka, stream=metrics-topic] defined. Using upcoming.


You haven't defined a default offset for the stream (or system). You can
do this with a stream-level config:

  systems.kafka.streams.metrics-topic.samza.offset.default=oldest

Or a system-level config:

  systems.kafka.samza.offset.default=oldest


The latter (system-level config) sets the default for all streams coming
from the Kafka system, while the former (stream-level) defines the config
only for the metrics-topic input stream.

Cheers,
Chris

On 4/22/14 4:52 PM, "Claudio Martins" <cl...@mobileaware.com> wrote:

>Hi Chris, the logs are on this link: http://pastebin.com/WQR7d52V
>
>This topic alone had around 500k messages to process and yet the Task did
>not get anything to process when it was restarted with "smallest" offset
>config.
>
>Is there anything I am missing?
>
>- Claudio Martins
>Head of Engineering and User Experience
>MobileAware USA Inc. / www.mobileaware.com
>office: +1  617 986 5060 / mobile: +1 617 480 5288
>linkedin: www.linkedin.com/in/martinsclaudio
>
>
>On Thu, Apr 17, 2014 at 3:45 PM, Chris Riccomini
><cr...@linkedin.com>wrote:
>
>> Hey Claudio,
>>
>> Hmm. Could you post your logs somewhere where we can have a look? The
>>logs
>> should say whether the checkpoint is used or not, and also provide
>>insight
>> into why they might not be.
>>
>> Cheers,
>> Chris
>>
>> On 4/17/14 11:19 AM, "Claudio Martins" <cl...@mobileaware.com> wrote:
>>
>> >Hi Chris, thanks for the answer.
>> >
>> >I already had those configured:
>> >
>> >task.checkpoint.factory=
>> >org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> >
>> >task.checkpoint.system=kafka
>> >
>> >task.checkpoint.replication.factor=1
>> >
>> >task.window.ms=5000
>> >
>> >task.commit.ms=5000
>> >
>> >task.checkpoint.failure.retry.ms=5000
>> >
>> >- Claudio Martins
>> >Head of Engineering and User Experience
>> >MobileAware USA Inc. / www.mobileaware.com
>> >office: +1 617 986 5060 / mobile: +1 617 480 5288
>> >linkedin: www.linkedin.com/in/martinsclaudio
>> >
>> >
>> >On Thu, Apr 17, 2014 at 2:15 PM, Chris Riccomini
>> ><cr...@linkedin.com>wrote:
>> >
>> >> Hey Claudio,
>> >>
>> >> It looks like you're using the old configs from Samza 0.6.0.
>> >> Unfortunately, our docs have not yet been updated to reflect the new
>> >> configs for 0.7.0. There is a JIRA open for this, though:
>> >>
>> >>   https://issues.apache.org/jira/browse/SAMZA-165
>> >>
>> >> To enable checkpointing, you'll need to set:
>> >>
>> >>
>> 
>>>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoi
>>>>nt
>> >>Ma
>> >> nagerFactory
>> >> # assume we have a system defined in the config called "kafka"
>> >> task.checkpoint.system=kafka
>> >> # can't be higher than the number of brokers in your Kafka grid,
>> >> # but should usually be more than one if you have more than one
>> >> # broker in your Kafka grid.
>> >> task.checkpoint.replication.factor=1
>> >>
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 4/16/14 3:03 PM, "Claudio Martins" <cl...@mobileaware.com>
>>wrote:
>> >>
>> >> >Hi guys,
>> >> >
>> >> >I'm having a hard time trying to figure out why my Samza job does
>>not
>> >> >start
>> >> >processing Kafka messages from the last checkpoint.
>> >> >
>> >> >I have the task configured as
>> >> >
>> >> >systems.kafka.consumer.auto.offset.reset=smallest
>> >> >streams.topic-name.consumer.reset.offset=false
>> >> >
>> >> >However, when the job runs it doesn't start processing anything from
>> >>the
>> >> >current offset, just the upcoming messages.
>> >> >
>> >> >I stop the job, load some messages into the topic and start the job
>> >>again.
>> >> >Nothing happens, just the new messages are processed.
>> >> >
>> >> >Is there anything I am missing here?
>> >> >
>> >> >I do not want to start processing from the beginning of the topic,
>> >>just to
>> >> >make it clear. I want to process from the last checkpoint.
>> >> >
>> >> >
>> >> >Thanks,
>> >> >
>> >> >- Claudio Martins
>> >> >linkedin: www.linkedin.com/in/martinsclaudio
>> >>
>> >>
>>
>>


Re: starting processing from the last checkpoint

Posted by Claudio Martins <cl...@mobileaware.com>.
Hi Chris, the logs are on this link: http://pastebin.com/WQR7d52V

This topic alone had around 500k messages to process and yet the Task did
not get anything to process when it was restarted with "smallest" offset
config.

Is there anything I am missing?

- Claudio Martins
Head of Engineering and User Experience
MobileAware USA Inc. / www.mobileaware.com
office: +1  617 986 5060 / mobile: +1 617 480 5288
linkedin: www.linkedin.com/in/martinsclaudio


On Thu, Apr 17, 2014 at 3:45 PM, Chris Riccomini <cr...@linkedin.com>wrote:

> Hey Claudio,
>
> Hmm. Could you post your logs somewhere where we can have a look? The logs
> should say whether the checkpoint is used or not, and also provide insight
> into why they might not be.
>
> Cheers,
> Chris
>
> On 4/17/14 11:19 AM, "Claudio Martins" <cl...@mobileaware.com> wrote:
>
> >Hi Chris, thanks for the answer.
> >
> >I already had those configured:
> >
> >task.checkpoint.factory=
> >org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >
> >task.checkpoint.system=kafka
> >
> >task.checkpoint.replication.factor=1
> >
> >task.window.ms=5000
> >
> >task.commit.ms=5000
> >
> >task.checkpoint.failure.retry.ms=5000
> >
> >- Claudio Martins
> >Head of Engineering and User Experience
> >MobileAware USA Inc. / www.mobileaware.com
> >office: +1 617 986 5060 / mobile: +1 617 480 5288
> >linkedin: www.linkedin.com/in/martinsclaudio
> >
> >
> >On Thu, Apr 17, 2014 at 2:15 PM, Chris Riccomini
> ><cr...@linkedin.com>wrote:
> >
> >> Hey Claudio,
> >>
> >> It looks like you're using the old configs from Samza 0.6.0.
> >> Unfortunately, our docs have not yet been updated to reflect the new
> >> configs for 0.7.0. There is a JIRA open for this, though:
> >>
> >>   https://issues.apache.org/jira/browse/SAMZA-165
> >>
> >> To enable checkpointing, you'll need to set:
> >>
> >>
> >>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint
> >>Ma
> >> nagerFactory
> >> # assume we have a system defined in the config called "kafka"
> >> task.checkpoint.system=kafka
> >> # can't be higher than the number of brokers in your Kafka grid,
> >> # but should usually be more than one if you have more than one
> >> # broker in your Kafka grid.
> >> task.checkpoint.replication.factor=1
> >>
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 4/16/14 3:03 PM, "Claudio Martins" <cl...@mobileaware.com> wrote:
> >>
> >> >Hi guys,
> >> >
> >> >I'm having a hard time trying to figure out why my Samza job does not
> >> >start
> >> >processing Kafka messages from the last checkpoint.
> >> >
> >> >I have the task configured as
> >> >
> >> >systems.kafka.consumer.auto.offset.reset=smallest
> >> >streams.topic-name.consumer.reset.offset=false
> >> >
> >> >However, when the job runs it doesn't start processing anything from
> >>the
> >> >current offset, just the upcoming messages.
> >> >
> >> >I stop the job, load some messages into the topic and start the job
> >>again.
> >> >Nothing happens, just the new messages are processed.
> >> >
> >> >Is there anything I am missing here?
> >> >
> >> >I do not want to start processing from the beginning of the topic,
> >>just to
> >> >make it clear. I want to process from the last checkpoint.
> >> >
> >> >
> >> >Thanks,
> >> >
> >> >- Claudio Martins
> >> >linkedin: www.linkedin.com/in/martinsclaudio
> >>
> >>
>
>

Re: starting processing from the last checkpoint

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Claudio,

Hmm. Could you post your logs somewhere where we can have a look? The logs
should say whether the checkpoint is used or not, and also provide insight
into why they might not be.

Cheers,
Chris

On 4/17/14 11:19 AM, "Claudio Martins" <cl...@mobileaware.com> wrote:

>Hi Chris, thanks for the answer.
>
>I already had those configured:
>
>task.checkpoint.factory=
>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>
>task.checkpoint.system=kafka
>
>task.checkpoint.replication.factor=1
>
>task.window.ms=5000
>
>task.commit.ms=5000
>
>task.checkpoint.failure.retry.ms=5000
>
>- Claudio Martins
>Head of Engineering and User Experience
>MobileAware USA Inc. / www.mobileaware.com
>office: +1  617 986 5060 / mobile: +1 617 480 5288
>linkedin: www.linkedin.com/in/martinsclaudio
>
>
>On Thu, Apr 17, 2014 at 2:15 PM, Chris Riccomini
><cr...@linkedin.com>wrote:
>
>> Hey Claudio,
>>
>> It looks like you're using the old configs from Samza 0.6.0.
>> Unfortunately, our docs have not yet been updated to reflect the new
>> configs for 0.7.0. There is a JIRA open for this, though:
>>
>>   https://issues.apache.org/jira/browse/SAMZA-165
>>
>> To enable checkpointing, you'll need to set:
>>
>> 
>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint
>>Ma
>> nagerFactory
>> # assume we have a system defined in the config called "kafka"
>> task.checkpoint.system=kafka
>> # can't be higher than the number of brokers in your Kafka grid,
>> # but should usually be more than one if you have more than one
>> # broker in your Kafka grid.
>> task.checkpoint.replication.factor=1
>>
>>
>> Cheers,
>> Chris
>>
>> On 4/16/14 3:03 PM, "Claudio Martins" <cl...@mobileaware.com> wrote:
>>
>> >Hi guys,
>> >
>> >I'm having a hard time trying to figure out why my Samza job does not
>> >start
>> >processing Kafka messages from the last checkpoint.
>> >
>> >I have the task configured as
>> >
>> >systems.kafka.consumer.auto.offset.reset=smallest
>> >streams.topic-name.consumer.reset.offset=false
>> >
>> >However, when the job runs it doesn't start processing anything from
>>the
>> >current offset, just the upcoming messages.
>> >
>> >I stop the job, load some messages into the topic and start the job
>>again.
>> >Nothing happens, just the new messages are processed.
>> >
>> >Is there anything I am missing here?
>> >
>> >I do not want to start processing from the beginning of the topic,
>>just to
>> >make it clear. I want to process from the last checkpoint.
>> >
>> >
>> >Thanks,
>> >
>> >- Claudio Martins
>> >linkedin: www.linkedin.com/in/martinsclaudio
>>
>>


Re: starting processing from the last checkpoint

Posted by Claudio Martins <cl...@mobileaware.com>.
Hi Chris, thanks for the answer.

I already had those configured:

task.checkpoint.factory=
org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory

task.checkpoint.system=kafka

task.checkpoint.replication.factor=1

task.window.ms=5000

task.commit.ms=5000

task.checkpoint.failure.retry.ms=5000

- Claudio Martins
Head of Engineering and User Experience
MobileAware USA Inc. / www.mobileaware.com
office: +1  617 986 5060 / mobile: +1 617 480 5288
linkedin: www.linkedin.com/in/martinsclaudio


On Thu, Apr 17, 2014 at 2:15 PM, Chris Riccomini <cr...@linkedin.com>wrote:

> Hey Claudio,
>
> It looks like you're using the old configs from Samza 0.6.0.
> Unfortunately, our docs have not yet been updated to reflect the new
> configs for 0.7.0. There is a JIRA open for this, though:
>
>   https://issues.apache.org/jira/browse/SAMZA-165
>
> To enable checkpointing, you'll need to set:
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointMa
> nagerFactory
> # assume we have a system defined in the config called "kafka"
> task.checkpoint.system=kafka
> # can't be higher than the number of brokers in your Kafka grid,
> # but should usually be more than one if you have more than one
> # broker in your Kafka grid.
> task.checkpoint.replication.factor=1
>
>
> Cheers,
> Chris
>
> On 4/16/14 3:03 PM, "Claudio Martins" <cl...@mobileaware.com> wrote:
>
> >Hi guys,
> >
> >I'm having a hard time trying to figure out why my Samza job does not
> >start
> >processing Kafka messages from the last checkpoint.
> >
> >I have the task configured as
> >
> >systems.kafka.consumer.auto.offset.reset=smallest
> >streams.topic-name.consumer.reset.offset=false
> >
> >However, when the job runs it doesn't start processing anything from the
> >current offset, just the upcoming messages.
> >
> >I stop the job, load some messages into the topic and start the job again.
> >Nothing happens, just the new messages are processed.
> >
> >Is there anything I am missing here?
> >
> >I do not want to start processing from the beginning of the topic, just to
> >make it clear. I want to process from the last checkpoint.
> >
> >
> >Thanks,
> >
> >- Claudio Martins
> >linkedin: www.linkedin.com/in/martinsclaudio
>
>

Re: starting processing from the last checkpoint

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Claudio,

It looks like you're using the old configs from Samza 0.6.0.
Unfortunately, our docs have not yet been updated to reflect the new
configs for 0.7.0. There is a JIRA open for this, though:

  https://issues.apache.org/jira/browse/SAMZA-165

To enable checkpointing, you'll need to set:

task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointMa
nagerFactory
# assume we have a system defined in the config called "kafka"
task.checkpoint.system=kafka
# can't be higher than the number of brokers in your Kafka grid,
# but should usually be more than one if you have more than one
# broker in your Kafka grid.
task.checkpoint.replication.factor=1


Cheers,
Chris

On 4/16/14 3:03 PM, "Claudio Martins" <cl...@mobileaware.com> wrote:

>Hi guys,
>
>I'm having a hard time trying to figure out why my Samza job does not
>start
>processing Kafka messages from the last checkpoint.
>
>I have the task configured as
>
>systems.kafka.consumer.auto.offset.reset=smallest
>streams.topic-name.consumer.reset.offset=false
>
>However, when the job runs it doesn't start processing anything from the
>current offset, just the upcoming messages.
>
>I stop the job, load some messages into the topic and start the job again.
>Nothing happens, just the new messages are processed.
>
>Is there anything I am missing here?
>
>I do not want to start processing from the beginning of the topic, just to
>make it clear. I want to process from the last checkpoint.
>
>
>Thanks,
>
>- Claudio Martins
>linkedin: www.linkedin.com/in/martinsclaudio