You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Garry Turkington <g....@improvedigital.com> on 2014/02/10 12:18:33 UTC

Using bootstrap streams

Hi,

I was building a task to do some sentiment analysis on incoming data. I have a corpus each of positive and negative words to which the task needs access. This seemed like a good fit for bootstrap streams. But I can't seem to get them to work.

I have my job configured with the 3 Kafka topics in task.inputs and that seems to work, just throwing data at any of the topics is hitting the task.

But setting up the 2 reference streams as bootstrap doesn't seem to be working. Here's the relevant part of the config, I want to read the entire message history each time:

systems.kafka.streams.positive-words.samza.bootstrap=true
systems.kafka.streams.positive-words.samza.reset.offset=true

systems.kafka.streams.negative-words.samza.bootstrap=true
systems.kafka.streams.negative-words.samza.reset.offset=true

Do bootstrap streams get handled in any special way, I'm assuming here that the messages will arrive in the process method on StreamTask just like any other and I can handle them differently by switching on envelope.getSystemStreamPartition().getSystemStream().getStream(). Looking at the code it looks the same with the BootstrapChooser doing its magic to determine which message is delivered to the task but the actual delivery seems the same.

What am I missing?

Thanks,
Garry


RE: Using bootstrap streams

Posted by Garry Turkington <g....@improvedigital.com>.
Hi Chris,

Woot! Yes, bootstrapped streams look like they are working properly with the SAMZA-145 patch. Thanks for the quick fix, appreciated.

Setting the default offset to smallest at the system level is a bit of a kludge for me, I'll indeed need set up separate system definitions to have different defaults for the bootstrapped vs non-bootstrapped streams. So SAMZA-144 is of interest to me, I'll try and look at that over the weekend.

Thanks again!
Garry

-----Original Message-----
From: Chris Riccomini [mailto:criccomini@linkedin.com] 
Sent: 13 February 2014 05:13
To: dev@samza.incubator.apache.org
Subject: Re: Using bootstrap streams

Hey Garry,

SAMZA-145 is merged in. Want to give stuff a shot with the latest patch?

Thanks for your patience. :)

Cheers,
Chris

On 2/12/14 6:28 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Garry,
>
>Yep, this is definitely a bug. I've opened:
>
>  https://issues.apache.org/jira/browse/SAMZA-145
>
>
>I've also posted a patch, and will merge it in as soon as I get a +1 
>(by tomorrow).
>
>Cheers,
>Chris
>
>On 2/12/14 5:10 PM, "Garry Turkington" 
><g....@improvedigital.com>
>wrote:
>
>>Hi Chris,
>>
>>OK, so that did have an affect. :)
>>
>>Adding the Kafka-level offset as 'smallest' I do indeed see messages 
>>from the positive and negative word streams arrive in my stream task. 
>>But they seem to be appearing interleaved with the other 
>>non-bootstrapped stream so I think I'm seeing each stream being read 
>>from the smallest offset but with no priority given to the 
>>bootstrapped streams. And I still see this in the container logs, am I 
>>correct in assuming this should be showing selection of the BootstrapChooser?
>>
>>DefaultChooser [INFO] Building default chooser with: 
>>useBatching=false, useBootstrapping=false, usePriority=false
>>
>>Thanks for the help with this, hugely appreciated!
>>
>>Garry
>>
>>
>>-----Original Message-----
>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>Sent: 12 February 2014 22:07
>>To: dev@samza.incubator.apache.org
>>Subject: Re: Using bootstrap streams
>>
>>Hey Garry,
>>
>>I've opened:
>>
>>  https://issues.apache.org/jira/browse/SAMZA-144
>>
>>To track the stream-level Kafka configuration override issue.
>>
>>Let me know if the "smallest" setting works for you.
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 11:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>
>>>Hey Garry,
>>>
>>>I just noticed that we don't actually support stream-level overrides 
>>>for Kafka configs.
>>>
>>>To tes this, you'll have to set the consumer settings at the system
>>>level:
>>>
>>>systems.kafka.consumer.auto.offset.reset=smallest
>>>
>>>
>>>Note that this will cause you to read all data from kafka.tweets the 
>>>first time you run your job, as well. I think this is probably what 
>>>you want, but if not, you'd have to define two systems: one for the 
>>>bootstrap streams, and one for the tweet stream, so that you could 
>>>configure the bootstrap system to have the "smallest" reset setting.
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/12/14 11:06 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>>
>>>>Hey Garry,
>>>>
>>>>I believe this is a similar issue to SAMZA-142.
>>>>
>>>>Can you try adding a config to set auto.offset.reset to smallest?
>>>>Something like:
>>>>
>>>>  
>>>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smal
>>>>le
>>>>st
>>>>  
>>>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smal
>>>>le
>>>>st
>>>>
>>>>
>>>>This should change this log line:
>>>>
>>>>Final offset to be returned for Topic and Partition 
>>>>[positive-words,0] =
>>>>2006
>>>>
>>>>
>>>>To something like:
>>>>
>>>>Final offset to be returned for Topic and Partition 
>>>>[positive-words,0] =
>>>>0
>>>>
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/12/14 10:44 AM, "Garry Turkington"
>>>><g....@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi Chris,
>>>>>
>>>>>Sorry for the wrong log file!
>>>>>
>>>>>Samza  container log is at:
>>>>>http://pastebin.com/D5bAJd7U
>>>>>
>>>>>I do notice that it mentions returning the highest offset for the 
>>>>>supposedly bootstrapped streams which I presume shouldn't be 
>>>>>happening.
>>>>>
>>>>>Thanks,
>>>>>Garry
>>>>>
>>>>>-----Original Message-----
>>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>>Sent: 12 February 2014 17:42
>>>>>To: dev@samza.incubator.apache.org
>>>>>Subject: Re: Using bootstrap streams
>>>>>
>>>>>Hey Garry,
>>>>>
>>>>>So far, everything looks normal.
>>>>>
>>>>>The container you log you sent me actually appears to be the output 
>>>>>of the run-job.sh command. Since you're using YarnJobFactory, this 
>>>>>is not actually the container log. Could you grab the log from the 
>>>>>container that's running in YARN, and stick that in pastebin? You 
>>>>>can usually find this by going to YARN's RM (http://localhost:8088) 
>>>>>and finding the link to your ApplicationMaster. This will link to 
>>>>>the logs for each container that's running your tasks.
>>>>>
>>>>>Cheers,
>>>>>Chris
>>>>>
>>>>>On 2/11/14 2:11 PM, "Garry Turkington"
>>>>><g....@improvedigital.com>
>>>>>wrote:
>>>>>
>>>>>>Hi Chris,
>>>>>>
>>>>>>Following up on this, sorry for the delay, travelling this week.
>>>>>>
>>>>>>Main task config:
>>>>>>http://pastebin.com/enQXLcbZ
>>>>>>
>>>>>>Container log:
>>>>>>http://pastebin.com/YLiKp0CS
>>>>>>
>>>>>>I'm putting the positive and negative words into the bootstrap 
>>>>>>streams prior to running the job -- and confirmed the data is in 
>>>>>>the Kafka stream via kafka-console-consumer.sh with the 
>>>>>>--from-beginning option.
>>>>>>
>>>>>>Thanks for any input!
>>>>>>Garry
>>>>>>
>>>>>>-----Original Message-----
>>>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>>>Sent: 10 February 2014 23:25
>>>>>>To: dev@samza.incubator.apache.org
>>>>>>Subject: Re: Using bootstrap streams
>>>>>>
>>>>>>Hey Garry,
>>>>>>
>>>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>>>
>>>>>>Bootstrap stream messages will be delivered to the process() 
>>>>>>method just like any other. The only difference is you're supposed 
>>>>>>to get all of them from 0-lastOffset before you get any messages 
>>>>>>from non-bootstrap streams.
>>>>>>Your positive/negative example sounds like a reasonable use case 
>>>>>>for a bootstrap stream.
>>>>>>
>>>>>>A few questions:
>>>>>>
>>>>>>1. Can you post the container logs and the full configuration file 
>>>>>>for your job somewhere (e.g. Github gist)?
>>>>>>2. Are you putting data into the positive-words and negative-words 
>>>>>>topic before you start the Samza job?
>>>>>>
>>>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>>>directly (no need to call getSystemStream()).
>>>>>>
>>>>>>Cheers,
>>>>>>Chris
>>>>>>
>>>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>>>><g....@improvedigital.com>
>>>>>>wrote:
>>>>>>
>>>>>>>Hi,
>>>>>>>
>>>>>>>I was building a task to do some sentiment analysis on incoming 
>>>>>>>data.
>>>>>>>I have a corpus each of positive and negative words to which the 
>>>>>>>task needs access. This seemed like a good fit for bootstrap 
>>>>>>>streams. But I can't seem to get them to work.
>>>>>>>
>>>>>>>I have my job configured with the 3 Kafka topics in task.inputs 
>>>>>>>and that seems to work, just throwing data at any of the topics 
>>>>>>>is hitting the task.
>>>>>>>
>>>>>>>But setting up the 2 reference streams as bootstrap doesn't seem 
>>>>>>>to be working. Here's the relevant part of the config, I want to 
>>>>>>>read the entire message history each time:
>>>>>>>
>>>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>>>
>>>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>>>
>>>>>>>Do bootstrap streams get handled in any special way, I'm assuming 
>>>>>>>here that the messages will arrive in the process method on 
>>>>>>>StreamTask just like any other and I can handle them differently 
>>>>>>>by switching on 
>>>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>>>Looking at the code it looks the same with the BootstrapChooser 
>>>>>>>doing its magic to determine which message is delivered to the 
>>>>>>>task but the actual delivery seems the same.
>>>>>>>
>>>>>>>What am I missing?
>>>>>>>
>>>>>>>Thanks,
>>>>>>>Garry
>>>>>>>
>>>>>>
>>>>>>
>>>>>>-----
>>>>>>No virus found in this message.
>>>>>>Checked by AVG - www.avg.com
>>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>>>02/10/14
>>>>>
>>>>>
>>>>>-----
>>>>>No virus found in this message.
>>>>>Checked by AVG - www.avg.com
>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>>02/10/14
>>>>
>>>
>>
>>
>>-----
>>No virus found in this message.
>>Checked by AVG - www.avg.com
>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 
>>02/10/14
>


Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

SAMZA-145 is merged in. Want to give stuff a shot with the latest patch?

Thanks for your patience. :)

Cheers,
Chris

On 2/12/14 6:28 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Garry,
>
>Yep, this is definitely a bug. I've opened:
>
>  https://issues.apache.org/jira/browse/SAMZA-145
>
>
>I've also posted a patch, and will merge it in as soon as I get a +1 (by
>tomorrow).
>
>Cheers,
>Chris
>
>On 2/12/14 5:10 PM, "Garry Turkington" <g....@improvedigital.com>
>wrote:
>
>>Hi Chris,
>>
>>OK, so that did have an affect. :)
>>
>>Adding the Kafka-level offset as 'smallest' I do indeed see messages from
>>the positive and negative word streams arrive in my stream task. But they
>>seem to be appearing interleaved with the other non-bootstrapped stream
>>so I think I'm seeing each stream being read from the smallest offset but
>>with no priority given to the bootstrapped streams. And I still see this
>>in the container logs, am I correct in assuming this should be showing
>>selection of the BootstrapChooser?
>>
>>DefaultChooser [INFO] Building default chooser with: useBatching=false,
>>useBootstrapping=false, usePriority=false
>>
>>Thanks for the help with this, hugely appreciated!
>>
>>Garry
>>
>>
>>-----Original Message-----
>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>Sent: 12 February 2014 22:07
>>To: dev@samza.incubator.apache.org
>>Subject: Re: Using bootstrap streams
>>
>>Hey Garry,
>>
>>I've opened:
>>
>>  https://issues.apache.org/jira/browse/SAMZA-144
>>
>>To track the stream-level Kafka configuration override issue.
>>
>>Let me know if the "smallest" setting works for you.
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 11:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>
>>>Hey Garry,
>>>
>>>I just noticed that we don't actually support stream-level overrides
>>>for Kafka configs.
>>>
>>>To tes this, you'll have to set the consumer settings at the system
>>>level:
>>>
>>>systems.kafka.consumer.auto.offset.reset=smallest
>>>
>>>
>>>Note that this will cause you to read all data from kafka.tweets the
>>>first time you run your job, as well. I think this is probably what you
>>>want, but if not, you'd have to define two systems: one for the
>>>bootstrap streams, and one for the tweet stream, so that you could
>>>configure the bootstrap system to have the "smallest" reset setting.
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/12/14 11:06 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>>
>>>>Hey Garry,
>>>>
>>>>I believe this is a similar issue to SAMZA-142.
>>>>
>>>>Can you try adding a config to set auto.offset.reset to smallest?
>>>>Something like:
>>>>
>>>>  
>>>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle
>>>>st
>>>>  
>>>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle
>>>>st
>>>>
>>>>
>>>>This should change this log line:
>>>>
>>>>Final offset to be returned for Topic and Partition [positive-words,0]
>>>>=
>>>>2006
>>>>
>>>>
>>>>To something like:
>>>>
>>>>Final offset to be returned for Topic and Partition [positive-words,0]
>>>>=
>>>>0
>>>>
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/12/14 10:44 AM, "Garry Turkington"
>>>><g....@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi Chris,
>>>>>
>>>>>Sorry for the wrong log file!
>>>>>
>>>>>Samza  container log is at:
>>>>>http://pastebin.com/D5bAJd7U
>>>>>
>>>>>I do notice that it mentions returning the highest offset for the
>>>>>supposedly bootstrapped streams which I presume shouldn't be
>>>>>happening.
>>>>>
>>>>>Thanks,
>>>>>Garry
>>>>>
>>>>>-----Original Message-----
>>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>>Sent: 12 February 2014 17:42
>>>>>To: dev@samza.incubator.apache.org
>>>>>Subject: Re: Using bootstrap streams
>>>>>
>>>>>Hey Garry,
>>>>>
>>>>>So far, everything looks normal.
>>>>>
>>>>>The container you log you sent me actually appears to be the output
>>>>>of the run-job.sh command. Since you're using YarnJobFactory, this is
>>>>>not actually the container log. Could you grab the log from the
>>>>>container that's running in YARN, and stick that in pastebin? You can
>>>>>usually find this by going to YARN's RM (http://localhost:8088) and
>>>>>finding the link to your ApplicationMaster. This will link to the
>>>>>logs for each container that's running your tasks.
>>>>>
>>>>>Cheers,
>>>>>Chris
>>>>>
>>>>>On 2/11/14 2:11 PM, "Garry Turkington"
>>>>><g....@improvedigital.com>
>>>>>wrote:
>>>>>
>>>>>>Hi Chris,
>>>>>>
>>>>>>Following up on this, sorry for the delay, travelling this week.
>>>>>>
>>>>>>Main task config:
>>>>>>http://pastebin.com/enQXLcbZ
>>>>>>
>>>>>>Container log:
>>>>>>http://pastebin.com/YLiKp0CS
>>>>>>
>>>>>>I'm putting the positive and negative words into the bootstrap
>>>>>>streams prior to running the job -- and confirmed the data is in the
>>>>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning
>>>>>>option.
>>>>>>
>>>>>>Thanks for any input!
>>>>>>Garry
>>>>>>
>>>>>>-----Original Message-----
>>>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>>>Sent: 10 February 2014 23:25
>>>>>>To: dev@samza.incubator.apache.org
>>>>>>Subject: Re: Using bootstrap streams
>>>>>>
>>>>>>Hey Garry,
>>>>>>
>>>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>>>
>>>>>>Bootstrap stream messages will be delivered to the process() method
>>>>>>just like any other. The only difference is you're supposed to get
>>>>>>all of them from 0-lastOffset before you get any messages from
>>>>>>non-bootstrap streams.
>>>>>>Your positive/negative example sounds like a reasonable use case for
>>>>>>a bootstrap stream.
>>>>>>
>>>>>>A few questions:
>>>>>>
>>>>>>1. Can you post the container logs and the full configuration file
>>>>>>for your job somewhere (e.g. Github gist)?
>>>>>>2. Are you putting data into the positive-words and negative-words
>>>>>>topic before you start the Samza job?
>>>>>>
>>>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>>>directly (no need to call getSystemStream()).
>>>>>>
>>>>>>Cheers,
>>>>>>Chris
>>>>>>
>>>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>>>><g....@improvedigital.com>
>>>>>>wrote:
>>>>>>
>>>>>>>Hi,
>>>>>>>
>>>>>>>I was building a task to do some sentiment analysis on incoming
>>>>>>>data.
>>>>>>>I have a corpus each of positive and negative words to which the
>>>>>>>task needs access. This seemed like a good fit for bootstrap
>>>>>>>streams. But I can't seem to get them to work.
>>>>>>>
>>>>>>>I have my job configured with the 3 Kafka topics in task.inputs and
>>>>>>>that seems to work, just throwing data at any of the topics is
>>>>>>>hitting the task.
>>>>>>>
>>>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to
>>>>>>>be working. Here's the relevant part of the config, I want to read
>>>>>>>the entire message history each time:
>>>>>>>
>>>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>>>
>>>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>>>
>>>>>>>Do bootstrap streams get handled in any special way, I'm assuming
>>>>>>>here that the messages will arrive in the process method on
>>>>>>>StreamTask just like any other and I can handle them differently by
>>>>>>>switching on
>>>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>>>Looking at the code it looks the same with the BootstrapChooser
>>>>>>>doing its magic to determine which message is delivered to the task
>>>>>>>but the actual delivery seems the same.
>>>>>>>
>>>>>>>What am I missing?
>>>>>>>
>>>>>>>Thanks,
>>>>>>>Garry
>>>>>>>
>>>>>>
>>>>>>
>>>>>>-----
>>>>>>No virus found in this message.
>>>>>>Checked by AVG - www.avg.com
>>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>>>02/10/14
>>>>>
>>>>>
>>>>>-----
>>>>>No virus found in this message.
>>>>>Checked by AVG - www.avg.com
>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>>02/10/14
>>>>
>>>
>>
>>
>>-----
>>No virus found in this message.
>>Checked by AVG - www.avg.com
>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14
>


Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

Yep, this is definitely a bug. I've opened:

  https://issues.apache.org/jira/browse/SAMZA-145


I've also posted a patch, and will merge it in as soon as I get a +1 (by
tomorrow).

Cheers,
Chris

On 2/12/14 5:10 PM, "Garry Turkington" <g....@improvedigital.com>
wrote:

>Hi Chris,
>
>OK, so that did have an affect. :)
>
>Adding the Kafka-level offset as 'smallest' I do indeed see messages from
>the positive and negative word streams arrive in my stream task. But they
>seem to be appearing interleaved with the other non-bootstrapped stream
>so I think I'm seeing each stream being read from the smallest offset but
>with no priority given to the bootstrapped streams. And I still see this
>in the container logs, am I correct in assuming this should be showing
>selection of the BootstrapChooser?
>
>DefaultChooser [INFO] Building default chooser with: useBatching=false,
>useBootstrapping=false, usePriority=false
>
>Thanks for the help with this, hugely appreciated!
>
>Garry
>
>
>-----Original Message-----
>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>Sent: 12 February 2014 22:07
>To: dev@samza.incubator.apache.org
>Subject: Re: Using bootstrap streams
>
>Hey Garry,
>
>I've opened:
>
>  https://issues.apache.org/jira/browse/SAMZA-144
>
>To track the stream-level Kafka configuration override issue.
>
>Let me know if the "smallest" setting works for you.
>
>Cheers,
>Chris
>
>On 2/12/14 11:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>
>>Hey Garry,
>>
>>I just noticed that we don't actually support stream-level overrides
>>for Kafka configs.
>>
>>To tes this, you'll have to set the consumer settings at the system
>>level:
>>
>>systems.kafka.consumer.auto.offset.reset=smallest
>>
>>
>>Note that this will cause you to read all data from kafka.tweets the
>>first time you run your job, as well. I think this is probably what you
>>want, but if not, you'd have to define two systems: one for the
>>bootstrap streams, and one for the tweet stream, so that you could
>>configure the bootstrap system to have the "smallest" reset setting.
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 11:06 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>
>>>Hey Garry,
>>>
>>>I believe this is a similar issue to SAMZA-142.
>>>
>>>Can you try adding a config to set auto.offset.reset to smallest?
>>>Something like:
>>>
>>>  
>>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle
>>>st
>>>  
>>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle
>>>st
>>>
>>>
>>>This should change this log line:
>>>
>>>Final offset to be returned for Topic and Partition [positive-words,0]
>>>=
>>>2006
>>>
>>>
>>>To something like:
>>>
>>>Final offset to be returned for Topic and Partition [positive-words,0]
>>>=
>>>0
>>>
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/12/14 10:44 AM, "Garry Turkington"
>>><g....@improvedigital.com>
>>>wrote:
>>>
>>>>Hi Chris,
>>>>
>>>>Sorry for the wrong log file!
>>>>
>>>>Samza  container log is at:
>>>>http://pastebin.com/D5bAJd7U
>>>>
>>>>I do notice that it mentions returning the highest offset for the
>>>>supposedly bootstrapped streams which I presume shouldn't be happening.
>>>>
>>>>Thanks,
>>>>Garry
>>>>
>>>>-----Original Message-----
>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>Sent: 12 February 2014 17:42
>>>>To: dev@samza.incubator.apache.org
>>>>Subject: Re: Using bootstrap streams
>>>>
>>>>Hey Garry,
>>>>
>>>>So far, everything looks normal.
>>>>
>>>>The container you log you sent me actually appears to be the output
>>>>of the run-job.sh command. Since you're using YarnJobFactory, this is
>>>>not actually the container log. Could you grab the log from the
>>>>container that's running in YARN, and stick that in pastebin? You can
>>>>usually find this by going to YARN's RM (http://localhost:8088) and
>>>>finding the link to your ApplicationMaster. This will link to the
>>>>logs for each container that's running your tasks.
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/11/14 2:11 PM, "Garry Turkington"
>>>><g....@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi Chris,
>>>>>
>>>>>Following up on this, sorry for the delay, travelling this week.
>>>>>
>>>>>Main task config:
>>>>>http://pastebin.com/enQXLcbZ
>>>>>
>>>>>Container log:
>>>>>http://pastebin.com/YLiKp0CS
>>>>>
>>>>>I'm putting the positive and negative words into the bootstrap
>>>>>streams prior to running the job -- and confirmed the data is in the
>>>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning
>>>>>option.
>>>>>
>>>>>Thanks for any input!
>>>>>Garry
>>>>>
>>>>>-----Original Message-----
>>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>>Sent: 10 February 2014 23:25
>>>>>To: dev@samza.incubator.apache.org
>>>>>Subject: Re: Using bootstrap streams
>>>>>
>>>>>Hey Garry,
>>>>>
>>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>>
>>>>>Bootstrap stream messages will be delivered to the process() method
>>>>>just like any other. The only difference is you're supposed to get
>>>>>all of them from 0-lastOffset before you get any messages from
>>>>>non-bootstrap streams.
>>>>>Your positive/negative example sounds like a reasonable use case for
>>>>>a bootstrap stream.
>>>>>
>>>>>A few questions:
>>>>>
>>>>>1. Can you post the container logs and the full configuration file
>>>>>for your job somewhere (e.g. Github gist)?
>>>>>2. Are you putting data into the positive-words and negative-words
>>>>>topic before you start the Samza job?
>>>>>
>>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>>directly (no need to call getSystemStream()).
>>>>>
>>>>>Cheers,
>>>>>Chris
>>>>>
>>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>>><g....@improvedigital.com>
>>>>>wrote:
>>>>>
>>>>>>Hi,
>>>>>>
>>>>>>I was building a task to do some sentiment analysis on incoming data.
>>>>>>I have a corpus each of positive and negative words to which the
>>>>>>task needs access. This seemed like a good fit for bootstrap
>>>>>>streams. But I can't seem to get them to work.
>>>>>>
>>>>>>I have my job configured with the 3 Kafka topics in task.inputs and
>>>>>>that seems to work, just throwing data at any of the topics is
>>>>>>hitting the task.
>>>>>>
>>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to
>>>>>>be working. Here's the relevant part of the config, I want to read
>>>>>>the entire message history each time:
>>>>>>
>>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>>
>>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>>
>>>>>>Do bootstrap streams get handled in any special way, I'm assuming
>>>>>>here that the messages will arrive in the process method on
>>>>>>StreamTask just like any other and I can handle them differently by
>>>>>>switching on 
>>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>>Looking at the code it looks the same with the BootstrapChooser
>>>>>>doing its magic to determine which message is delivered to the task
>>>>>>but the actual delivery seems the same.
>>>>>>
>>>>>>What am I missing?
>>>>>>
>>>>>>Thanks,
>>>>>>Garry
>>>>>>
>>>>>
>>>>>
>>>>>-----
>>>>>No virus found in this message.
>>>>>Checked by AVG - www.avg.com
>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>>02/10/14
>>>>
>>>>
>>>>-----
>>>>No virus found in this message.
>>>>Checked by AVG - www.avg.com
>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>02/10/14
>>>
>>
>
>
>-----
>No virus found in this message.
>Checked by AVG - www.avg.com
>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14


RE: Using bootstrap streams

Posted by Garry Turkington <g....@improvedigital.com>.
Hi Chris,

OK, so that did have an affect. :)

Adding the Kafka-level offset as 'smallest' I do indeed see messages from the positive and negative word streams arrive in my stream task. But they seem to be appearing interleaved with the other non-bootstrapped stream so I think I'm seeing each stream being read from the smallest offset but with no priority given to the bootstrapped streams. And I still see this in the container logs, am I correct in assuming this should be showing selection of the BootstrapChooser?

DefaultChooser [INFO] Building default chooser with: useBatching=false, useBootstrapping=false, usePriority=false

Thanks for the help with this, hugely appreciated!

Garry


-----Original Message-----
From: Chris Riccomini [mailto:criccomini@linkedin.com] 
Sent: 12 February 2014 22:07
To: dev@samza.incubator.apache.org
Subject: Re: Using bootstrap streams

Hey Garry,

I've opened:

  https://issues.apache.org/jira/browse/SAMZA-144

To track the stream-level Kafka configuration override issue.

Let me know if the "smallest" setting works for you.

Cheers,
Chris

On 2/12/14 11:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Garry,
>
>I just noticed that we don't actually support stream-level overrides 
>for Kafka configs.
>
>To tes this, you'll have to set the consumer settings at the system level:
>
>systems.kafka.consumer.auto.offset.reset=smallest
>
>
>Note that this will cause you to read all data from kafka.tweets the 
>first time you run your job, as well. I think this is probably what you 
>want, but if not, you'd have to define two systems: one for the 
>bootstrap streams, and one for the tweet stream, so that you could 
>configure the bootstrap system to have the "smallest" reset setting.
>
>Cheers,
>Chris
>
>On 2/12/14 11:06 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>
>>Hey Garry,
>>
>>I believe this is a similar issue to SAMZA-142.
>>
>>Can you try adding a config to set auto.offset.reset to smallest?
>>Something like:
>>
>>  
>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle
>>st
>>  
>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle
>>st
>>
>>
>>This should change this log line:
>>
>>Final offset to be returned for Topic and Partition [positive-words,0] 
>>=
>>2006
>>
>>
>>To something like:
>>
>>Final offset to be returned for Topic and Partition [positive-words,0] 
>>=
>>0
>>
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 10:44 AM, "Garry Turkington" 
>><g....@improvedigital.com>
>>wrote:
>>
>>>Hi Chris,
>>>
>>>Sorry for the wrong log file!
>>>
>>>Samza  container log is at:
>>>http://pastebin.com/D5bAJd7U
>>>
>>>I do notice that it mentions returning the highest offset for the 
>>>supposedly bootstrapped streams which I presume shouldn't be happening.
>>>
>>>Thanks,
>>>Garry
>>>
>>>-----Original Message-----
>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>Sent: 12 February 2014 17:42
>>>To: dev@samza.incubator.apache.org
>>>Subject: Re: Using bootstrap streams
>>>
>>>Hey Garry,
>>>
>>>So far, everything looks normal.
>>>
>>>The container you log you sent me actually appears to be the output 
>>>of the run-job.sh command. Since you're using YarnJobFactory, this is 
>>>not actually the container log. Could you grab the log from the 
>>>container that's running in YARN, and stick that in pastebin? You can 
>>>usually find this by going to YARN's RM (http://localhost:8088) and 
>>>finding the link to your ApplicationMaster. This will link to the 
>>>logs for each container that's running your tasks.
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/11/14 2:11 PM, "Garry Turkington" 
>>><g....@improvedigital.com>
>>>wrote:
>>>
>>>>Hi Chris,
>>>>
>>>>Following up on this, sorry for the delay, travelling this week.
>>>>
>>>>Main task config:
>>>>http://pastebin.com/enQXLcbZ
>>>>
>>>>Container log:
>>>>http://pastebin.com/YLiKp0CS
>>>>
>>>>I'm putting the positive and negative words into the bootstrap 
>>>>streams prior to running the job -- and confirmed the data is in the 
>>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning option.
>>>>
>>>>Thanks for any input!
>>>>Garry
>>>>
>>>>-----Original Message-----
>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>Sent: 10 February 2014 23:25
>>>>To: dev@samza.incubator.apache.org
>>>>Subject: Re: Using bootstrap streams
>>>>
>>>>Hey Garry,
>>>>
>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>
>>>>Bootstrap stream messages will be delivered to the process() method 
>>>>just like any other. The only difference is you're supposed to get 
>>>>all of them from 0-lastOffset before you get any messages from 
>>>>non-bootstrap streams.
>>>>Your positive/negative example sounds like a reasonable use case for 
>>>>a bootstrap stream.
>>>>
>>>>A few questions:
>>>>
>>>>1. Can you post the container logs and the full configuration file 
>>>>for your job somewhere (e.g. Github gist)?
>>>>2. Are you putting data into the positive-words and negative-words 
>>>>topic before you start the Samza job?
>>>>
>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>directly (no need to call getSystemStream()).
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>><g....@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi,
>>>>>
>>>>>I was building a task to do some sentiment analysis on incoming data.
>>>>>I have a corpus each of positive and negative words to which the 
>>>>>task needs access. This seemed like a good fit for bootstrap 
>>>>>streams. But I can't seem to get them to work.
>>>>>
>>>>>I have my job configured with the 3 Kafka topics in task.inputs and 
>>>>>that seems to work, just throwing data at any of the topics is 
>>>>>hitting the task.
>>>>>
>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to 
>>>>>be working. Here's the relevant part of the config, I want to read 
>>>>>the entire message history each time:
>>>>>
>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>
>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>
>>>>>Do bootstrap streams get handled in any special way, I'm assuming 
>>>>>here that the messages will arrive in the process method on 
>>>>>StreamTask just like any other and I can handle them differently by 
>>>>>switching on envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>Looking at the code it looks the same with the BootstrapChooser 
>>>>>doing its magic to determine which message is delivered to the task 
>>>>>but the actual delivery seems the same.
>>>>>
>>>>>What am I missing?
>>>>>
>>>>>Thanks,
>>>>>Garry
>>>>>
>>>>
>>>>
>>>>-----
>>>>No virus found in this message.
>>>>Checked by AVG - www.avg.com
>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>02/10/14
>>>
>>>
>>>-----
>>>No virus found in this message.
>>>Checked by AVG - www.avg.com
>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>02/10/14
>>
>


-----
No virus found in this message.
Checked by AVG - www.avg.com
Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14

Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

I've opened:

  https://issues.apache.org/jira/browse/SAMZA-144

To track the stream-level Kafka configuration override issue.

Let me know if the "smallest" setting works for you.

Cheers,
Chris

On 2/12/14 11:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Garry,
>
>I just noticed that we don't actually support stream-level overrides for
>Kafka configs.
>
>To tes this, you'll have to set the consumer settings at the system level:
>
>systems.kafka.consumer.auto.offset.reset=smallest
>
>
>Note that this will cause you to read all data from kafka.tweets the first
>time you run your job, as well. I think this is probably what you want,
>but if not, you'd have to define two systems: one for the bootstrap
>streams, and one for the tweet stream, so that you could configure the
>bootstrap system to have the "smallest" reset setting.
>
>Cheers,
>Chris
>
>On 2/12/14 11:06 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>
>>Hey Garry,
>>
>>I believe this is a similar issue to SAMZA-142.
>>
>>Can you try adding a config to set auto.offset.reset to smallest?
>>Something like:
>>
>>  
>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smallest
>>  
>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smallest
>>
>>
>>This should change this log line:
>>
>>Final offset to be returned for Topic and Partition [positive-words,0] =
>>2006
>>
>>
>>To something like:
>>
>>Final offset to be returned for Topic and Partition [positive-words,0] =
>>0
>>
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 10:44 AM, "Garry Turkington" <g....@improvedigital.com>
>>wrote:
>>
>>>Hi Chris,
>>>
>>>Sorry for the wrong log file!
>>>
>>>Samza  container log is at:
>>>http://pastebin.com/D5bAJd7U
>>>
>>>I do notice that it mentions returning the highest offset for the
>>>supposedly bootstrapped streams which I presume shouldn't be happening.
>>>
>>>Thanks,
>>>Garry
>>>
>>>-----Original Message-----
>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>Sent: 12 February 2014 17:42
>>>To: dev@samza.incubator.apache.org
>>>Subject: Re: Using bootstrap streams
>>>
>>>Hey Garry,
>>>
>>>So far, everything looks normal.
>>>
>>>The container you log you sent me actually appears to be the output of
>>>the run-job.sh command. Since you're using YarnJobFactory, this is not
>>>actually the container log. Could you grab the log from the container
>>>that's running in YARN, and stick that in pastebin? You can usually find
>>>this by going to YARN's RM (http://localhost:8088) and finding the link
>>>to your ApplicationMaster. This will link to the logs for each container
>>>that's running your tasks.
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/11/14 2:11 PM, "Garry Turkington" <g....@improvedigital.com>
>>>wrote:
>>>
>>>>Hi Chris,
>>>>
>>>>Following up on this, sorry for the delay, travelling this week.
>>>>
>>>>Main task config:
>>>>http://pastebin.com/enQXLcbZ
>>>>
>>>>Container log:
>>>>http://pastebin.com/YLiKp0CS
>>>>
>>>>I'm putting the positive and negative words into the bootstrap streams
>>>>prior to running the job -- and confirmed the data is in the Kafka
>>>>stream via kafka-console-consumer.sh with the --from-beginning option.
>>>>
>>>>Thanks for any input!
>>>>Garry
>>>>
>>>>-----Original Message-----
>>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>>Sent: 10 February 2014 23:25
>>>>To: dev@samza.incubator.apache.org
>>>>Subject: Re: Using bootstrap streams
>>>>
>>>>Hey Garry,
>>>>
>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>
>>>>Bootstrap stream messages will be delivered to the process() method
>>>>just like any other. The only difference is you're supposed to get all
>>>>of them from 0-lastOffset before you get any messages from
>>>>non-bootstrap
>>>>streams.
>>>>Your positive/negative example sounds like a reasonable use case for a
>>>>bootstrap stream.
>>>>
>>>>A few questions:
>>>>
>>>>1. Can you post the container logs and the full configuration file for
>>>>your job somewhere (e.g. Github gist)?
>>>>2. Are you putting data into the positive-words and negative-words
>>>>topic before you start the Samza job?
>>>>
>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>directly (no need to call getSystemStream()).
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>><g....@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi,
>>>>>
>>>>>I was building a task to do some sentiment analysis on incoming data.
>>>>>I have a corpus each of positive and negative words to which the task
>>>>>needs access. This seemed like a good fit for bootstrap streams. But I
>>>>>can't seem to get them to work.
>>>>>
>>>>>I have my job configured with the 3 Kafka topics in task.inputs and
>>>>>that seems to work, just throwing data at any of the topics is hitting
>>>>>the task.
>>>>>
>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to be
>>>>>working. Here's the relevant part of the config, I want to read the
>>>>>entire message history each time:
>>>>>
>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>
>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>
>>>>>Do bootstrap streams get handled in any special way, I'm assuming here
>>>>>that the messages will arrive in the process method on StreamTask just
>>>>>like any other and I can handle them differently by switching on
>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>Looking at the code it looks the same with the BootstrapChooser doing
>>>>>its magic to determine which message is delivered to the task but the
>>>>>actual delivery seems the same.
>>>>>
>>>>>What am I missing?
>>>>>
>>>>>Thanks,
>>>>>Garry
>>>>>
>>>>
>>>>
>>>>-----
>>>>No virus found in this message.
>>>>Checked by AVG - www.avg.com
>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>02/10/14
>>>
>>>
>>>-----
>>>No virus found in this message.
>>>Checked by AVG - www.avg.com
>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>02/10/14
>>
>


Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

I just noticed that we don't actually support stream-level overrides for
Kafka configs.

To tes this, you'll have to set the consumer settings at the system level:

systems.kafka.consumer.auto.offset.reset=smallest


Note that this will cause you to read all data from kafka.tweets the first
time you run your job, as well. I think this is probably what you want,
but if not, you'd have to define two systems: one for the bootstrap
streams, and one for the tweet stream, so that you could configure the
bootstrap system to have the "smallest" reset setting.

Cheers,
Chris

On 2/12/14 11:06 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Garry,
>
>I believe this is a similar issue to SAMZA-142.
>
>Can you try adding a config to set auto.offset.reset to smallest?
>Something like:
>
>  systems.kafka.streams.positive-words.consumer.auto.offset.reset=smallest
>  systems.kafka.streams.negative-words.consumer.auto.offset.reset=smallest
>
>
>This should change this log line:
>
>Final offset to be returned for Topic and Partition [positive-words,0] =
>2006
>
>
>To something like:
>
>Final offset to be returned for Topic and Partition [positive-words,0] = 0
>
>
>Cheers,
>Chris
>
>On 2/12/14 10:44 AM, "Garry Turkington" <g....@improvedigital.com>
>wrote:
>
>>Hi Chris,
>>
>>Sorry for the wrong log file!
>>
>>Samza  container log is at:
>>http://pastebin.com/D5bAJd7U
>>
>>I do notice that it mentions returning the highest offset for the
>>supposedly bootstrapped streams which I presume shouldn't be happening.
>>
>>Thanks,
>>Garry
>>
>>-----Original Message-----
>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>Sent: 12 February 2014 17:42
>>To: dev@samza.incubator.apache.org
>>Subject: Re: Using bootstrap streams
>>
>>Hey Garry,
>>
>>So far, everything looks normal.
>>
>>The container you log you sent me actually appears to be the output of
>>the run-job.sh command. Since you're using YarnJobFactory, this is not
>>actually the container log. Could you grab the log from the container
>>that's running in YARN, and stick that in pastebin? You can usually find
>>this by going to YARN's RM (http://localhost:8088) and finding the link
>>to your ApplicationMaster. This will link to the logs for each container
>>that's running your tasks.
>>
>>Cheers,
>>Chris
>>
>>On 2/11/14 2:11 PM, "Garry Turkington" <g....@improvedigital.com>
>>wrote:
>>
>>>Hi Chris,
>>>
>>>Following up on this, sorry for the delay, travelling this week.
>>>
>>>Main task config:
>>>http://pastebin.com/enQXLcbZ
>>>
>>>Container log:
>>>http://pastebin.com/YLiKp0CS
>>>
>>>I'm putting the positive and negative words into the bootstrap streams
>>>prior to running the job -- and confirmed the data is in the Kafka
>>>stream via kafka-console-consumer.sh with the --from-beginning option.
>>>
>>>Thanks for any input!
>>>Garry
>>>
>>>-----Original Message-----
>>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>>Sent: 10 February 2014 23:25
>>>To: dev@samza.incubator.apache.org
>>>Subject: Re: Using bootstrap streams
>>>
>>>Hey Garry,
>>>
>>>It sounds like your understanding of bootstrap streams is correct.
>>>
>>>Bootstrap stream messages will be delivered to the process() method
>>>just like any other. The only difference is you're supposed to get all
>>>of them from 0-lastOffset before you get any messages from non-bootstrap
>>>streams.
>>>Your positive/negative example sounds like a reasonable use case for a
>>>bootstrap stream.
>>>
>>>A few questions:
>>>
>>>1. Can you post the container logs and the full configuration file for
>>>your job somewhere (e.g. Github gist)?
>>>2. Are you putting data into the positive-words and negative-words
>>>topic before you start the Samza job?
>>>
>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>directly (no need to call getSystemStream()).
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>><g....@improvedigital.com>
>>>wrote:
>>>
>>>>Hi,
>>>>
>>>>I was building a task to do some sentiment analysis on incoming data.
>>>>I have a corpus each of positive and negative words to which the task
>>>>needs access. This seemed like a good fit for bootstrap streams. But I
>>>>can't seem to get them to work.
>>>>
>>>>I have my job configured with the 3 Kafka topics in task.inputs and
>>>>that seems to work, just throwing data at any of the topics is hitting
>>>>the task.
>>>>
>>>>But setting up the 2 reference streams as bootstrap doesn't seem to be
>>>>working. Here's the relevant part of the config, I want to read the
>>>>entire message history each time:
>>>>
>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>
>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>
>>>>Do bootstrap streams get handled in any special way, I'm assuming here
>>>>that the messages will arrive in the process method on StreamTask just
>>>>like any other and I can handle them differently by switching on
>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>Looking at the code it looks the same with the BootstrapChooser doing
>>>>its magic to determine which message is delivered to the task but the
>>>>actual delivery seems the same.
>>>>
>>>>What am I missing?
>>>>
>>>>Thanks,
>>>>Garry
>>>>
>>>
>>>
>>>-----
>>>No virus found in this message.
>>>Checked by AVG - www.avg.com
>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>02/10/14
>>
>>
>>-----
>>No virus found in this message.
>>Checked by AVG - www.avg.com
>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14
>


Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

I believe this is a similar issue to SAMZA-142.

Can you try adding a config to set auto.offset.reset to smallest?
Something like:

  systems.kafka.streams.positive-words.consumer.auto.offset.reset=smallest
  systems.kafka.streams.negative-words.consumer.auto.offset.reset=smallest


This should change this log line:

Final offset to be returned for Topic and Partition [positive-words,0] =
2006


To something like:

Final offset to be returned for Topic and Partition [positive-words,0] = 0


Cheers,
Chris

On 2/12/14 10:44 AM, "Garry Turkington" <g....@improvedigital.com>
wrote:

>Hi Chris,
>
>Sorry for the wrong log file!
>
>Samza  container log is at:
>http://pastebin.com/D5bAJd7U
>
>I do notice that it mentions returning the highest offset for the
>supposedly bootstrapped streams which I presume shouldn't be happening.
>
>Thanks,
>Garry
>
>-----Original Message-----
>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>Sent: 12 February 2014 17:42
>To: dev@samza.incubator.apache.org
>Subject: Re: Using bootstrap streams
>
>Hey Garry,
>
>So far, everything looks normal.
>
>The container you log you sent me actually appears to be the output of
>the run-job.sh command. Since you're using YarnJobFactory, this is not
>actually the container log. Could you grab the log from the container
>that's running in YARN, and stick that in pastebin? You can usually find
>this by going to YARN's RM (http://localhost:8088) and finding the link
>to your ApplicationMaster. This will link to the logs for each container
>that's running your tasks.
>
>Cheers,
>Chris
>
>On 2/11/14 2:11 PM, "Garry Turkington" <g....@improvedigital.com>
>wrote:
>
>>Hi Chris,
>>
>>Following up on this, sorry for the delay, travelling this week.
>>
>>Main task config:
>>http://pastebin.com/enQXLcbZ
>>
>>Container log:
>>http://pastebin.com/YLiKp0CS
>>
>>I'm putting the positive and negative words into the bootstrap streams
>>prior to running the job -- and confirmed the data is in the Kafka
>>stream via kafka-console-consumer.sh with the --from-beginning option.
>>
>>Thanks for any input!
>>Garry
>>
>>-----Original Message-----
>>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>>Sent: 10 February 2014 23:25
>>To: dev@samza.incubator.apache.org
>>Subject: Re: Using bootstrap streams
>>
>>Hey Garry,
>>
>>It sounds like your understanding of bootstrap streams is correct.
>>
>>Bootstrap stream messages will be delivered to the process() method
>>just like any other. The only difference is you're supposed to get all
>>of them from 0-lastOffset before you get any messages from non-bootstrap
>>streams.
>>Your positive/negative example sounds like a reasonable use case for a
>>bootstrap stream.
>>
>>A few questions:
>>
>>1. Can you post the container logs and the full configuration file for
>>your job somewhere (e.g. Github gist)?
>>2. Are you putting data into the positive-words and negative-words
>>topic before you start the Samza job?
>>
>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>directly (no need to call getSystemStream()).
>>
>>Cheers,
>>Chris
>>
>>On 2/10/14 3:18 AM, "Garry Turkington"
>><g....@improvedigital.com>
>>wrote:
>>
>>>Hi,
>>>
>>>I was building a task to do some sentiment analysis on incoming data.
>>>I have a corpus each of positive and negative words to which the task
>>>needs access. This seemed like a good fit for bootstrap streams. But I
>>>can't seem to get them to work.
>>>
>>>I have my job configured with the 3 Kafka topics in task.inputs and
>>>that seems to work, just throwing data at any of the topics is hitting
>>>the task.
>>>
>>>But setting up the 2 reference streams as bootstrap doesn't seem to be
>>>working. Here's the relevant part of the config, I want to read the
>>>entire message history each time:
>>>
>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>
>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>
>>>Do bootstrap streams get handled in any special way, I'm assuming here
>>>that the messages will arrive in the process method on StreamTask just
>>>like any other and I can handle them differently by switching on
>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>Looking at the code it looks the same with the BootstrapChooser doing
>>>its magic to determine which message is delivered to the task but the
>>>actual delivery seems the same.
>>>
>>>What am I missing?
>>>
>>>Thanks,
>>>Garry
>>>
>>
>>
>>-----
>>No virus found in this message.
>>Checked by AVG - www.avg.com
>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>02/10/14
>
>
>-----
>No virus found in this message.
>Checked by AVG - www.avg.com
>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14


RE: Using bootstrap streams

Posted by Garry Turkington <g....@improvedigital.com>.
Hi Chris,

Sorry for the wrong log file!

Samza  container log is at:
http://pastebin.com/D5bAJd7U

I do notice that it mentions returning the highest offset for the supposedly bootstrapped streams which I presume shouldn't be happening.

Thanks,
Garry

-----Original Message-----
From: Chris Riccomini [mailto:criccomini@linkedin.com] 
Sent: 12 February 2014 17:42
To: dev@samza.incubator.apache.org
Subject: Re: Using bootstrap streams

Hey Garry,

So far, everything looks normal.

The container you log you sent me actually appears to be the output of the run-job.sh command. Since you're using YarnJobFactory, this is not actually the container log. Could you grab the log from the container that's running in YARN, and stick that in pastebin? You can usually find this by going to YARN's RM (http://localhost:8088) and finding the link to your ApplicationMaster. This will link to the logs for each container that's running your tasks.

Cheers,
Chris

On 2/11/14 2:11 PM, "Garry Turkington" <g....@improvedigital.com>
wrote:

>Hi Chris,
>
>Following up on this, sorry for the delay, travelling this week.
>
>Main task config:
>http://pastebin.com/enQXLcbZ
>
>Container log:
>http://pastebin.com/YLiKp0CS
>
>I'm putting the positive and negative words into the bootstrap streams 
>prior to running the job -- and confirmed the data is in the Kafka 
>stream via kafka-console-consumer.sh with the --from-beginning option.
>
>Thanks for any input!
>Garry
>
>-----Original Message-----
>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>Sent: 10 February 2014 23:25
>To: dev@samza.incubator.apache.org
>Subject: Re: Using bootstrap streams
>
>Hey Garry,
>
>It sounds like your understanding of bootstrap streams is correct.
>
>Bootstrap stream messages will be delivered to the process() method 
>just like any other. The only difference is you're supposed to get all 
>of them from 0-lastOffset before you get any messages from non-bootstrap streams.
>Your positive/negative example sounds like a reasonable use case for a 
>bootstrap stream.
>
>A few questions:
>
>1. Can you post the container logs and the full configuration file for 
>your job somewhere (e.g. Github gist)?
>2. Are you putting data into the positive-words and negative-words 
>topic before you start the Samza job?
>
>Also, you can do envelope.getSystemStreamPartition().getStream() 
>directly (no need to call getSystemStream()).
>
>Cheers,
>Chris
>
>On 2/10/14 3:18 AM, "Garry Turkington" 
><g....@improvedigital.com>
>wrote:
>
>>Hi,
>>
>>I was building a task to do some sentiment analysis on incoming data. 
>>I have a corpus each of positive and negative words to which the task 
>>needs access. This seemed like a good fit for bootstrap streams. But I 
>>can't seem to get them to work.
>>
>>I have my job configured with the 3 Kafka topics in task.inputs and 
>>that seems to work, just throwing data at any of the topics is hitting 
>>the task.
>>
>>But setting up the 2 reference streams as bootstrap doesn't seem to be 
>>working. Here's the relevant part of the config, I want to read the 
>>entire message history each time:
>>
>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>
>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>
>>Do bootstrap streams get handled in any special way, I'm assuming here 
>>that the messages will arrive in the process method on StreamTask just 
>>like any other and I can handle them differently by switching on 
>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>Looking at the code it looks the same with the BootstrapChooser doing 
>>its magic to determine which message is delivered to the task but the 
>>actual delivery seems the same.
>>
>>What am I missing?
>>
>>Thanks,
>>Garry
>>
>
>
>-----
>No virus found in this message.
>Checked by AVG - www.avg.com
>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 
>02/10/14


-----
No virus found in this message.
Checked by AVG - www.avg.com
Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14

Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

So far, everything looks normal.

The container you log you sent me actually appears to be the output of the
run-job.sh command. Since you're using YarnJobFactory, this is not
actually the container log. Could you grab the log from the container
that's running in YARN, and stick that in pastebin? You can usually find
this by going to YARN's RM (http://localhost:8088) and finding the link to
your ApplicationMaster. This will link to the logs for each container
that's running your tasks.

Cheers,
Chris

On 2/11/14 2:11 PM, "Garry Turkington" <g....@improvedigital.com>
wrote:

>Hi Chris,
>
>Following up on this, sorry for the delay, travelling this week.
>
>Main task config:
>http://pastebin.com/enQXLcbZ
>
>Container log:
>http://pastebin.com/YLiKp0CS
>
>I'm putting the positive and negative words into the bootstrap streams
>prior to running the job -- and confirmed the data is in the Kafka stream
>via kafka-console-consumer.sh with the --from-beginning option.
>
>Thanks for any input!
>Garry
>
>-----Original Message-----
>From: Chris Riccomini [mailto:criccomini@linkedin.com]
>Sent: 10 February 2014 23:25
>To: dev@samza.incubator.apache.org
>Subject: Re: Using bootstrap streams
>
>Hey Garry,
>
>It sounds like your understanding of bootstrap streams is correct.
>
>Bootstrap stream messages will be delivered to the process() method just
>like any other. The only difference is you're supposed to get all of them
>from 0-lastOffset before you get any messages from non-bootstrap streams.
>Your positive/negative example sounds like a reasonable use case for a
>bootstrap stream.
>
>A few questions:
>
>1. Can you post the container logs and the full configuration file for
>your job somewhere (e.g. Github gist)?
>2. Are you putting data into the positive-words and negative-words topic
>before you start the Samza job?
>
>Also, you can do envelope.getSystemStreamPartition().getStream() directly
>(no need to call getSystemStream()).
>
>Cheers,
>Chris
>
>On 2/10/14 3:18 AM, "Garry Turkington" <g....@improvedigital.com>
>wrote:
>
>>Hi,
>>
>>I was building a task to do some sentiment analysis on incoming data. I
>>have a corpus each of positive and negative words to which the task
>>needs access. This seemed like a good fit for bootstrap streams. But I
>>can't seem to get them to work.
>>
>>I have my job configured with the 3 Kafka topics in task.inputs and
>>that seems to work, just throwing data at any of the topics is hitting
>>the task.
>>
>>But setting up the 2 reference streams as bootstrap doesn't seem to be
>>working. Here's the relevant part of the config, I want to read the
>>entire message history each time:
>>
>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>
>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>
>>Do bootstrap streams get handled in any special way, I'm assuming here
>>that the messages will arrive in the process method on StreamTask just
>>like any other and I can handle them differently by switching on
>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>Looking at the code it looks the same with the BootstrapChooser doing
>>its magic to determine which message is delivered to the task but the
>>actual delivery seems the same.
>>
>>What am I missing?
>>
>>Thanks,
>>Garry
>>
>
>
>-----
>No virus found in this message.
>Checked by AVG - www.avg.com
>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14


RE: Using bootstrap streams

Posted by Garry Turkington <g....@improvedigital.com>.
Hi Chris,

Following up on this, sorry for the delay, travelling this week.

Main task config:
http://pastebin.com/enQXLcbZ

Container log:
http://pastebin.com/YLiKp0CS

I'm putting the positive and negative words into the bootstrap streams prior to running the job -- and confirmed the data is in the Kafka stream via kafka-console-consumer.sh with the --from-beginning option.

Thanks for any input!
Garry

-----Original Message-----
From: Chris Riccomini [mailto:criccomini@linkedin.com] 
Sent: 10 February 2014 23:25
To: dev@samza.incubator.apache.org
Subject: Re: Using bootstrap streams

Hey Garry,

It sounds like your understanding of bootstrap streams is correct.

Bootstrap stream messages will be delivered to the process() method just like any other. The only difference is you're supposed to get all of them from 0-lastOffset before you get any messages from non-bootstrap streams.
Your positive/negative example sounds like a reasonable use case for a bootstrap stream.

A few questions:

1. Can you post the container logs and the full configuration file for your job somewhere (e.g. Github gist)?
2. Are you putting data into the positive-words and negative-words topic before you start the Samza job?

Also, you can do envelope.getSystemStreamPartition().getStream() directly (no need to call getSystemStream()).

Cheers,
Chris

On 2/10/14 3:18 AM, "Garry Turkington" <g....@improvedigital.com>
wrote:

>Hi,
>
>I was building a task to do some sentiment analysis on incoming data. I 
>have a corpus each of positive and negative words to which the task 
>needs access. This seemed like a good fit for bootstrap streams. But I 
>can't seem to get them to work.
>
>I have my job configured with the 3 Kafka topics in task.inputs and 
>that seems to work, just throwing data at any of the topics is hitting 
>the task.
>
>But setting up the 2 reference streams as bootstrap doesn't seem to be 
>working. Here's the relevant part of the config, I want to read the 
>entire message history each time:
>
>systems.kafka.streams.positive-words.samza.bootstrap=true
>systems.kafka.streams.positive-words.samza.reset.offset=true
>
>systems.kafka.streams.negative-words.samza.bootstrap=true
>systems.kafka.streams.negative-words.samza.reset.offset=true
>
>Do bootstrap streams get handled in any special way, I'm assuming here 
>that the messages will arrive in the process method on StreamTask just 
>like any other and I can handle them differently by switching on 
>envelope.getSystemStreamPartition().getSystemStream().getStream().
>Looking at the code it looks the same with the BootstrapChooser doing 
>its magic to determine which message is delivered to the task but the 
>actual delivery seems the same.
>
>What am I missing?
>
>Thanks,
>Garry
>


-----
No virus found in this message.
Checked by AVG - www.avg.com
Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14

Re: Using bootstrap streams

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Garry,

It sounds like your understanding of bootstrap streams is correct.

Bootstrap stream messages will be delivered to the process() method just
like any other. The only difference is you're supposed to get all of them
from 0-lastOffset before you get any messages from non-bootstrap streams.
Your positive/negative example sounds like a reasonable use case for a
bootstrap stream.

A few questions:

1. Can you post the container logs and the full configuration file for
your job somewhere (e.g. Github gist)?
2. Are you putting data into the positive-words and negative-words topic
before you start the Samza job?

Also, you can do envelope.getSystemStreamPartition().getStream() directly
(no need to call getSystemStream()).

Cheers,
Chris

On 2/10/14 3:18 AM, "Garry Turkington" <g....@improvedigital.com>
wrote:

>Hi,
>
>I was building a task to do some sentiment analysis on incoming data. I
>have a corpus each of positive and negative words to which the task needs
>access. This seemed like a good fit for bootstrap streams. But I can't
>seem to get them to work.
>
>I have my job configured with the 3 Kafka topics in task.inputs and that
>seems to work, just throwing data at any of the topics is hitting the
>task.
>
>But setting up the 2 reference streams as bootstrap doesn't seem to be
>working. Here's the relevant part of the config, I want to read the
>entire message history each time:
>
>systems.kafka.streams.positive-words.samza.bootstrap=true
>systems.kafka.streams.positive-words.samza.reset.offset=true
>
>systems.kafka.streams.negative-words.samza.bootstrap=true
>systems.kafka.streams.negative-words.samza.reset.offset=true
>
>Do bootstrap streams get handled in any special way, I'm assuming here
>that the messages will arrive in the process method on StreamTask just
>like any other and I can handle them differently by switching on
>envelope.getSystemStreamPartition().getSystemStream().getStream().
>Looking at the code it looks the same with the BootstrapChooser doing its
>magic to determine which message is delivered to the task but the actual
>delivery seems the same.
>
>What am I missing?
>
>Thanks,
>Garry
>