You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jonathan Poltak Samosir <jo...@gmail.com> on 2014/02/01 01:24:21 UTC

Re: Clarification on SystemConsumer method functionality

Hi Chris,

Oh dear, that's rather embarrassing, although it makes perfect sense now.

Here is the first ~900 lines of the second container log (that warning continues to repeat):
https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2e14fb0e717/gistfile1.txt

There definitely seems to be an issue there. Unfortunately, I cannot work out where exactly the problem lies since the String cast error message is truncated. Do you know of any easy way to get the rest of this message?

Thank you for your patience as well. It is very much appreciated!

Jonathan


On 31 Jan 2014, at 11:20, Chris Riccomini <cr...@linkedin.com> wrote:

> Hey Jonathan,
> 
> Could you send the logs for the actual container? The AM isn't actually
> running your code--it just manages and tells YARN to start a SECOND
> container to run your code. :)
> 
> For example, in the AM logs, you'll see:
> 
> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0 for
> container container_1391135766129_0001_01_000002 on node s1
> (http://s1:8042/node/containerlogs/container_1391135766129_0001_01_000002).
> 
> This is the log path to the container that's running your code.
> 
> Alternatively, you can find the second container by clicking on the
> ApplicationMaster link in the YARN RM web UI, and then clicking on the
> link in the "running containers" section that ends with _000002.
> 
> If you could grab that log and send it here, I can take a look and help
> you figure out what's going on.
> 
> Cheers,
> Chris
> 
> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir" <jo...@gmail.com>
> wrote:
> 
>> Hi Chris,
>> 
>> Thanks for your thoughts.
>> 
>> 1. I will definitely put file reading on another thread, I just wanted to
>> get it working for now with a very simple test file before I move on and
>> re-implement my code to be more robust. Thanks for the pointers on
>> checking for space before calling BlockingEnvelopeMap.put(), also. I will
>> definitely come back to that afterwards.
>> 
>> 2. Again, will definitely revisit this point later on as well as the file
>> offset tip.
>> 
>> 3. Here are the logs:
>> application-master.log:
>> https://gist.github.com/poltak/8726030
>> 
>> gc.log (probably not needed, but it's there):
>> https://gist.github.com/poltak/8726004
>> 
>> stdout:
>> https://gist.github.com/poltak/8726036
>> 
>> stderr was clean.
>> 
>> Not sure if you will find anything useful out of those, but I'm sure your
>> understanding of it will greatly outweigh mine.
>> 
>> Thanks,
>> Jonathan
>> 
>> 
>> On 30 Jan 2014, at 15:47, Chris Riccomini <cr...@linkedin.com> wrote:
>> 
>>> Hey Jonathan,
>>> 
>>> Another follow-on thought. You're currently using null for the offset,
>>> but
>>> you could very easily use filepos, and then fseek to the filepos when a
>>> SystemStreamPartition is instantiated. This would give you the ability
>>> to
>>> "pick up where you left off" rather than re-reading the whole file every
>>> time your SamzaContainer starts.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 1/30/14 3:43 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>> 
>>>> Hey Jonathan,
>>>> 
>>>> I took a look at your code. Nothing looks horribly wrong at first
>>>> glance.
>>>> A couple of thoughts:
>>>> 
>>>> 1. You're reading the full file and loading it into the
>>>> BlockingEnvelopeMap in your start() method. This is OK if the file is
>>>> small, and will ALWAYS be small. If the file is NOT small, you need to
>>>> move the file reading to another thread, and only
>>>> BlockingEnvelopeMap.put() when there is space. The start/stop methods
>>>> would then start and stop your reader thread. The "when there is space"
>>>> behavior can be implemented in one of two ways: 1) check
>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines,
>>>> and
>>>> block if it's above some threshold or 2) override
>>>> BlockingEnvelopeMap.newBlockingQueue
>>>> To use a bounded queue, which will automatically force your reader
>>>> thread
>>>> to block if the queue is full.
>>>> 2. An alternative input path style would be to just define the input
>>>> path
>>>> as the stream name. For example,
>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>>>> MedicalDataConsumer.register method call
>>>> systemStreamPartition.getStream
>>>> to get the file path, and instantiate the file reader there. The
>>>> advantage
>>>> to this approach is it means you only need one MedicaDataConsumer to
>>>> read
>>>> from N number of files, rather than having one MedicalDataConsumer (and
>>>> system) per file.
>>>> 3. Can you send the output log when running your job?
>>>> 
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir"
>>>> <jo...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Thanks for the help, Chris!
>>>>> 
>>>>> The Javadocs you updated certainly did help my understanding of the
>>>>> SystemConsumer interface, although I still have not been able to get
>>>>> my
>>>>> System to work.
>>>>> 
>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>>>> implementation of SystemConsumer, as it seemed a lot simpler for what
>>>>> I
>>>>> want to do plus the hello-samza example uses it so I can use it as a
>>>>> reference example.
>>>>> 
>>>>> Even simply putting a call to 'put()' at the beginning of the
>>>>> 'start()'
>>>>> method with a debug message, nothing is received by my simple
>>>>> StreamTask
>>>>> (which simply forwards what is received from my System's stream onto a
>>>>> Kafka stream for now). As this is the case, I think there is a flaw
>>>>> in my
>>>>> understanding of something much more fundamental here relating to
>>>>> Samza
>>>>> System... I am sure there is something very simple that I am missing
>>>>> with
>>>>> my implementation, as I have based it directly off the
>>>>> WikipediaSystem of
>>>>> which I feel I have a pretty thorough understanding of now.
>>>>> 
>>>>> If anyone has time to look at my implementations for SystemConsumer,
>>>>> SystemFactory and the previously mentioned StreamTask's config file
>>>>> (they're all very simple and "too-the-point", since I'm just trying to
>>>>> get things working), it would be very much appreciated.
>>>>> 
>>>>> SystemFactory implementation:
>>>>> 
>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/
>>>>> sr
>>>>> c
>>>>> 
>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.jav
>>>>> a
>>>>> 
>>>>> SystemConsumer implementation:
>>>>> 
>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/
>>>>> sr
>>>>> c
>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>>>> 
>>>>> StreamTask config (which specifies input from my System):
>>>>> 
>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-packag
>>>>> e/
>>>>> s
>>>>> rc/main/config/medical-data-feed.properties
>>>>> 
>>>>> And yes, once I get it working and reading a file into a Samza
>>>>> Stream, I
>>>>> will be happy to submit a patch.
>>>>> 
>>>>> Thanks,
>>>>> Jonathan
>>>>> 
>>>>> 
>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <cr...@linkedin.com>
>>>>> wrote:
>>>>> 
>>>>>> Hey Jonathan,
>>>>>> 
>>>>>> I've attempted to answer your questions by updating the Javadocs. :)
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javado
>>>>>> cs
>>>>>> /
>>>>>> or
>>>>>> g/apache/samza/system/SystemConsumer.html
>>>>>> 
>>>>>> Let me know if anything doesn't make sense.
>>>>>> 
>>>>>> Also, it'd be awesome if you could contribute your file reader
>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>>>> things.
>>>>>> Could you open a JIRA and submit a patch when you're ready?
>>>>>> 
>>>>>> Cheers,
>>>>>> Chris
>>>>>> 
>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>>>> <jo...@gmail.com> wrote:
>>>>>> 
>>>>>>> Hello,
>>>>>>> 
>>>>>>> Currently trying to write a simple Samza System that reads from a
>>>>>>> file
>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing it
>>>>>>> off
>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>>>> implementation seemed to be pretty straightforward (get path to file
>>>>>>> from
>>>>>>> config and return a SystemConsumer that reads the file at that
>>>>>>> path),
>>>>>>> although I am not 100% sure of what the purpose of all the
>>>>>>> SystemConsumer
>>>>>>> interface methods are.
>>>>>>> 
>>>>>>> start() and stop() seem fairly self-explanatory, getting called at
>>>>>>> the
>>>>>>> when the System is started and stopped, respectively (please let me
>>>>>>> know
>>>>>>> if I am wrong about any of my understandings). register() seems to
>>>>>>> be
>>>>>>> similar to start() in that it will be called near the beginning of
>>>>>>> the
>>>>>>> System, although giving access to the SystemStreamPartition and a
>>>>>>> given
>>>>>>> partition offset, correct? The poll() method seems to be where most
>>>>>>> of
>>>>>>> the action takes place, and going by its name, it is called often or
>>>>>>> at
>>>>>>> specified intervals? If so, how does this polling work and how is
>>>>>>> the
>>>>>>> interval specified?
>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>>>> poll():
>>>>>>> these are then forwarded on to their specified
>>>>>>> SystemStreamPartitions
>>>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>>>> 
>>>>>>> Anyway, thanks for your time and let me know how far off I am with
>>>>>>> my
>>>>>>> understanding of this (as I can't get anything working with my
>>>>>>> current
>>>>>>> system implementation). Will be happy to contribute back Javadoc
>>>>>>> patch
>>>>>>> reflecting my amended understanding of this interface afterwards.
>>>>>>> 
>>>>>>> Jonathan
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Clarification on SystemConsumer method functionality

Posted by Jonathan Poltak Samosir <sa...@gmail.com>.
Hi Chris,

Heh, nice pick up on my typo.

Thanks for the explanation on the importance of defining a key serializer, also.
And yes, sure enough after specifying those configs (and fixing my typo), everything worked as intended and the contents of my test file were put onto a stream which my StreamTask managed to receive.

I will open a JIRA for the error logging.

Thanks a lot for the ongoing support,
Jonathan

On 3 Feb 2014, at 9:26, Chris Riccomini <cr...@linkedin.com> wrote:

> Hey Jonathan,
> 
> Also, I noticed you have "serder" instead of "serde" for your msg serde
> config 
> (https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/s
> rc/main/config/medical-data-feed.properties). In both cases, this should
> be:
> 
> systems.kafka.samza.key.serde=string
> 
> systems.kafka.samza.msg.serde=json
> 
> 
> No 'r'. Didn't notice that before.
> 
> Cheers,
> Chris
> 
> On 2/3/14 9:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:
> 
>> Hey Jonathan,
>> 
>> Ah ah. This error is caused because you're sending a non-byte-array key in
>> your StreamTask, and you don't have a key serializer defined. Samza, by
>> default, allows you to send messages to a system without a serializer
>> defined. If you do this, then it assumes you're giving it bytes.
>> 
>> Here's are the configs you need:
>> 
>> 
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerde
>> F
>> actory
>> 
>> systems.kafka.samza.key.serder=string
>> 
>> 
>> This is causing no messages to be sent, which explains why you're unable
>> to see any output from your job.
>> 
>> We could definitely improve the error messaging here. Could you open a
>> JIRA for this?
>> 
>> Cheers,
>> Chris
>> 
>> On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <jo...@gmail.com>
>> wrote:
>> 
>>> Hi Chris,
>>> 
>>> Oh dear, that's rather embarrassing, although it makes perfect sense now.
>>> 
>>> Here is the first ~900 lines of the second container log (that warning
>>> continues to repeat):
>>> https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2
>>> e
>>> 14fb0e717/gistfile1.txt
>>> 
>>> There definitely seems to be an issue there. Unfortunately, I cannot work
>>> out where exactly the problem lies since the String cast error message is
>>> truncated. Do you know of any easy way to get the rest of this message?
>>> 
>>> Thank you for your patience as well. It is very much appreciated!
>>> 
>>> Jonathan
>>> 
>>> 
>>> On 31 Jan 2014, at 11:20, Chris Riccomini <cr...@linkedin.com>
>>> wrote:
>>> 
>>>> Hey Jonathan,
>>>> 
>>>> Could you send the logs for the actual container? The AM isn't actually
>>>> running your code--it just manages and tells YARN to start a SECOND
>>>> container to run your code. :)
>>>> 
>>>> For example, in the AM logs, you'll see:
>>>> 
>>>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0
>>>> for
>>>> container container_1391135766129_0001_01_000002 on node s1
>>>> 
>>>> (http://s1:8042/node/containerlogs/container_1391135766129_0001_01_00000
>>>> 2
>>>> ).
>>>> 
>>>> This is the log path to the container that's running your code.
>>>> 
>>>> Alternatively, you can find the second container by clicking on the
>>>> ApplicationMaster link in the YARN RM web UI, and then clicking on the
>>>> link in the "running containers" section that ends with _000002.
>>>> 
>>>> If you could grab that log and send it here, I can take a look and help
>>>> you figure out what's going on.
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir"
>>>> <jo...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Chris,
>>>>> 
>>>>> Thanks for your thoughts.
>>>>> 
>>>>> 1. I will definitely put file reading on another thread, I just wanted
>>>>> to
>>>>> get it working for now with a very simple test file before I move on
>>>>> and
>>>>> re-implement my code to be more robust. Thanks for the pointers on
>>>>> checking for space before calling BlockingEnvelopeMap.put(), also. I
>>>>> will
>>>>> definitely come back to that afterwards.
>>>>> 
>>>>> 2. Again, will definitely revisit this point later on as well as the
>>>>> file
>>>>> offset tip.
>>>>> 
>>>>> 3. Here are the logs:
>>>>> application-master.log:
>>>>> https://gist.github.com/poltak/8726030
>>>>> 
>>>>> gc.log (probably not needed, but it's there):
>>>>> https://gist.github.com/poltak/8726004
>>>>> 
>>>>> stdout:
>>>>> https://gist.github.com/poltak/8726036
>>>>> 
>>>>> stderr was clean.
>>>>> 
>>>>> Not sure if you will find anything useful out of those, but I'm sure
>>>>> your
>>>>> understanding of it will greatly outweigh mine.
>>>>> 
>>>>> Thanks,
>>>>> Jonathan
>>>>> 
>>>>> 
>>>>> On 30 Jan 2014, at 15:47, Chris Riccomini <cr...@linkedin.com>
>>>>> wrote:
>>>>> 
>>>>>> Hey Jonathan,
>>>>>> 
>>>>>> Another follow-on thought. You're currently using null for the
>>>>>> offset,
>>>>>> but
>>>>>> you could very easily use filepos, and then fseek to the filepos when
>>>>>> a
>>>>>> SystemStreamPartition is instantiated. This would give you the
>>>>>> ability
>>>>>> to
>>>>>> "pick up where you left off" rather than re-reading the whole file
>>>>>> every
>>>>>> time your SamzaContainer starts.
>>>>>> 
>>>>>> Cheers,
>>>>>> Chris
>>>>>> 
>>>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <cr...@linkedin.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hey Jonathan,
>>>>>>> 
>>>>>>> I took a look at your code. Nothing looks horribly wrong at first
>>>>>>> glance.
>>>>>>> A couple of thoughts:
>>>>>>> 
>>>>>>> 1. You're reading the full file and loading it into the
>>>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file
>>>>>>> is
>>>>>>> small, and will ALWAYS be small. If the file is NOT small, you need
>>>>>>> to
>>>>>>> move the file reading to another thread, and only
>>>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop
>>>>>>> methods
>>>>>>> would then start and stop your reader thread. The "when there is
>>>>>>> space"
>>>>>>> behavior can be implemented in one of two ways: 1) check
>>>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines,
>>>>>>> and
>>>>>>> block if it's above some threshold or 2) override
>>>>>>> BlockingEnvelopeMap.newBlockingQueue
>>>>>>> To use a bounded queue, which will automatically force your reader
>>>>>>> thread
>>>>>>> to block if the queue is full.
>>>>>>> 2. An alternative input path style would be to just define the input
>>>>>>> path
>>>>>>> as the stream name. For example,
>>>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>>>>>>> MedicalDataConsumer.register method call
>>>>>>> systemStreamPartition.getStream
>>>>>>> to get the file path, and instantiate the file reader there. The
>>>>>>> advantage
>>>>>>> to this approach is it means you only need one MedicaDataConsumer to
>>>>>>> read
>>>>>>> from N number of files, rather than having one MedicalDataConsumer
>>>>>>> (and
>>>>>>> system) per file.
>>>>>>> 3. Can you send the output log when running your job?
>>>>>>> 
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>> 
>>>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir"
>>>>>>> <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Thanks for the help, Chris!
>>>>>>>> 
>>>>>>>> The Javadocs you updated certainly did help my understanding of the
>>>>>>>> SystemConsumer interface, although I still have not been able to
>>>>>>>> get
>>>>>>>> my
>>>>>>>> System to work.
>>>>>>>> 
>>>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for
>>>>>>>> what
>>>>>>>> I
>>>>>>>> want to do plus the hello-samza example uses it so I can use it as
>>>>>>>> a
>>>>>>>> reference example.
>>>>>>>> 
>>>>>>>> Even simply putting a call to 'put()' at the beginning of the
>>>>>>>> 'start()'
>>>>>>>> method with a debug message, nothing is received by my simple
>>>>>>>> StreamTask
>>>>>>>> (which simply forwards what is received from my System's stream
>>>>>>>> onto a
>>>>>>>> Kafka stream for now). As this is the case, I think there is a flaw
>>>>>>>> in my
>>>>>>>> understanding of something much more fundamental here relating to
>>>>>>>> Samza
>>>>>>>> System... I am sure there is something very simple that I am
>>>>>>>> missing
>>>>>>>> with
>>>>>>>> my implementation, as I have based it directly off the
>>>>>>>> WikipediaSystem of
>>>>>>>> which I feel I have a pretty thorough understanding of now.
>>>>>>>> 
>>>>>>>> If anyone has time to look at my implementations for
>>>>>>>> SystemConsumer,
>>>>>>>> SystemFactory and the previously mentioned StreamTask's config file
>>>>>>>> (they're all very simple and "too-the-point", since I'm just trying
>>>>>>>> to
>>>>>>>> get things working), it would be very much appreciated.
>>>>>>>> 
>>>>>>>> SystemFactory implementation:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>> i
>>>>>>>> a/
>>>>>>>> sr
>>>>>>>> c
>>>>>>>> 
>>>>>>>> 
>>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.
>>>>>>>> j
>>>>>>>> av
>>>>>>>> a
>>>>>>>> 
>>>>>>>> SystemConsumer implementation:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>> i
>>>>>>>> a/
>>>>>>>> sr
>>>>>>>> c
>>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>>>>>>> 
>>>>>>>> StreamTask config (which specifies input from my System):
>>>>>>>> 
>>>>>>>> 
>>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pac
>>>>>>>> k
>>>>>>>> ag
>>>>>>>> e/
>>>>>>>> s
>>>>>>>> rc/main/config/medical-data-feed.properties
>>>>>>>> 
>>>>>>>> And yes, once I get it working and reading a file into a Samza
>>>>>>>> Stream, I
>>>>>>>> will be happy to submit a patch.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Jonathan
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <cr...@linkedin.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hey Jonathan,
>>>>>>>>> 
>>>>>>>>> I've attempted to answer your questions by updating the Javadocs.
>>>>>>>>> :)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/jav
>>>>>>>>> a
>>>>>>>>> do
>>>>>>>>> cs
>>>>>>>>> /
>>>>>>>>> or
>>>>>>>>> g/apache/samza/system/SystemConsumer.html
>>>>>>>>> 
>>>>>>>>> Let me know if anything doesn't make sense.
>>>>>>>>> 
>>>>>>>>> Also, it'd be awesome if you could contribute your file reader
>>>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>>>>>>> things.
>>>>>>>>> Could you open a JIRA and submit a patch when you're ready?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Chris
>>>>>>>>> 
>>>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>>>>>>> <jo...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Hello,
>>>>>>>>>> 
>>>>>>>>>> Currently trying to write a simple Samza System that reads from a
>>>>>>>>>> file
>>>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing
>>>>>>>>>> it
>>>>>>>>>> off
>>>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>>>>>>> implementation seemed to be pretty straightforward (get path to
>>>>>>>>>> file
>>>>>>>>>> from
>>>>>>>>>> config and return a SystemConsumer that reads the file at that
>>>>>>>>>> path),
>>>>>>>>>> although I am not 100% sure of what the purpose of all the
>>>>>>>>>> SystemConsumer
>>>>>>>>>> interface methods are.
>>>>>>>>>> 
>>>>>>>>>> start() and stop() seem fairly self-explanatory, getting called
>>>>>>>>>> at
>>>>>>>>>> the
>>>>>>>>>> when the System is started and stopped, respectively (please let
>>>>>>>>>> me
>>>>>>>>>> know
>>>>>>>>>> if I am wrong about any of my understandings). register() seems
>>>>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>> similar to start() in that it will be called near the beginning
>>>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>> System, although giving access to the SystemStreamPartition and a
>>>>>>>>>> given
>>>>>>>>>> partition offset, correct? The poll() method seems to be where
>>>>>>>>>> most
>>>>>>>>>> of
>>>>>>>>>> the action takes place, and going by its name, it is called often
>>>>>>>>>> or
>>>>>>>>>> at
>>>>>>>>>> specified intervals? If so, how does this polling work and how is
>>>>>>>>>> the
>>>>>>>>>> interval specified?
>>>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>>>>>>> poll():
>>>>>>>>>> these are then forwarded on to their specified
>>>>>>>>>> SystemStreamPartitions
>>>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>>>>>>> 
>>>>>>>>>> Anyway, thanks for your time and let me know how far off I am
>>>>>>>>>> with
>>>>>>>>>> my
>>>>>>>>>> understanding of this (as I can't get anything working with my
>>>>>>>>>> current
>>>>>>>>>> system implementation). Will be happy to contribute back Javadoc
>>>>>>>>>> patch
>>>>>>>>>> reflecting my amended understanding of this interface afterwards.
>>>>>>>>>> 
>>>>>>>>>> Jonathan
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Clarification on SystemConsumer method functionality

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Jonathan,

Also, I noticed you have "serder" instead of "serde" for your msg serde
config 
(https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/s
rc/main/config/medical-data-feed.properties). In both cases, this should
be:

systems.kafka.samza.key.serde=string

systems.kafka.samza.msg.serde=json


No 'r'. Didn't notice that before.

Cheers,
Chris

On 2/3/14 9:21 AM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Jonathan,
>
>Ah ah. This error is caused because you're sending a non-byte-array key in
>your StreamTask, and you don't have a key serializer defined. Samza, by
>default, allows you to send messages to a system without a serializer
>defined. If you do this, then it assumes you're giving it bytes.
>
>Here's are the configs you need:
>
>  
>serializers.registry.string.class=org.apache.samza.serializers.StringSerde
>F
>actory
>
>  systems.kafka.samza.key.serder=string
>
>
>This is causing no messages to be sent, which explains why you're unable
>to see any output from your job.
>
>We could definitely improve the error messaging here. Could you open a
>JIRA for this?
>
>Cheers,
>Chris
>
>On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <jo...@gmail.com>
>wrote:
>
>>Hi Chris,
>>
>>Oh dear, that's rather embarrassing, although it makes perfect sense now.
>>
>>Here is the first ~900 lines of the second container log (that warning
>>continues to repeat):
>>https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2
>>e
>>14fb0e717/gistfile1.txt
>>
>>There definitely seems to be an issue there. Unfortunately, I cannot work
>>out where exactly the problem lies since the String cast error message is
>>truncated. Do you know of any easy way to get the rest of this message?
>>
>>Thank you for your patience as well. It is very much appreciated!
>>
>>Jonathan
>>
>>
>>On 31 Jan 2014, at 11:20, Chris Riccomini <cr...@linkedin.com>
>>wrote:
>>
>>> Hey Jonathan,
>>> 
>>> Could you send the logs for the actual container? The AM isn't actually
>>> running your code--it just manages and tells YARN to start a SECOND
>>> container to run your code. :)
>>> 
>>> For example, in the AM logs, you'll see:
>>> 
>>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0
>>>for
>>> container container_1391135766129_0001_01_000002 on node s1
>>> 
>>>(http://s1:8042/node/containerlogs/container_1391135766129_0001_01_00000
>>>2
>>>).
>>> 
>>> This is the log path to the container that's running your code.
>>> 
>>> Alternatively, you can find the second container by clicking on the
>>> ApplicationMaster link in the YARN RM web UI, and then clicking on the
>>> link in the "running containers" section that ends with _000002.
>>> 
>>> If you could grab that log and send it here, I can take a look and help
>>> you figure out what's going on.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir"
>>><jo...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Chris,
>>>> 
>>>> Thanks for your thoughts.
>>>> 
>>>> 1. I will definitely put file reading on another thread, I just wanted
>>>>to
>>>> get it working for now with a very simple test file before I move on
>>>>and
>>>> re-implement my code to be more robust. Thanks for the pointers on
>>>> checking for space before calling BlockingEnvelopeMap.put(), also. I
>>>>will
>>>> definitely come back to that afterwards.
>>>> 
>>>> 2. Again, will definitely revisit this point later on as well as the
>>>>file
>>>> offset tip.
>>>> 
>>>> 3. Here are the logs:
>>>> application-master.log:
>>>> https://gist.github.com/poltak/8726030
>>>> 
>>>> gc.log (probably not needed, but it's there):
>>>> https://gist.github.com/poltak/8726004
>>>> 
>>>> stdout:
>>>> https://gist.github.com/poltak/8726036
>>>> 
>>>> stderr was clean.
>>>> 
>>>> Not sure if you will find anything useful out of those, but I'm sure
>>>>your
>>>> understanding of it will greatly outweigh mine.
>>>> 
>>>> Thanks,
>>>> Jonathan
>>>> 
>>>> 
>>>> On 30 Jan 2014, at 15:47, Chris Riccomini <cr...@linkedin.com>
>>>>wrote:
>>>> 
>>>>> Hey Jonathan,
>>>>> 
>>>>> Another follow-on thought. You're currently using null for the
>>>>>offset,
>>>>> but
>>>>> you could very easily use filepos, and then fseek to the filepos when
>>>>>a
>>>>> SystemStreamPartition is instantiated. This would give you the
>>>>>ability
>>>>> to
>>>>> "pick up where you left off" rather than re-reading the whole file
>>>>>every
>>>>> time your SamzaContainer starts.
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <cr...@linkedin.com>
>>>>>wrote:
>>>>> 
>>>>>> Hey Jonathan,
>>>>>> 
>>>>>> I took a look at your code. Nothing looks horribly wrong at first
>>>>>> glance.
>>>>>> A couple of thoughts:
>>>>>> 
>>>>>> 1. You're reading the full file and loading it into the
>>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file
>>>>>>is
>>>>>> small, and will ALWAYS be small. If the file is NOT small, you need
>>>>>>to
>>>>>> move the file reading to another thread, and only
>>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop
>>>>>>methods
>>>>>> would then start and stop your reader thread. The "when there is
>>>>>>space"
>>>>>> behavior can be implemented in one of two ways: 1) check
>>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines,
>>>>>> and
>>>>>> block if it's above some threshold or 2) override
>>>>>> BlockingEnvelopeMap.newBlockingQueue
>>>>>> To use a bounded queue, which will automatically force your reader
>>>>>> thread
>>>>>> to block if the queue is full.
>>>>>> 2. An alternative input path style would be to just define the input
>>>>>> path
>>>>>> as the stream name. For example,
>>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>>>>>> MedicalDataConsumer.register method call
>>>>>> systemStreamPartition.getStream
>>>>>> to get the file path, and instantiate the file reader there. The
>>>>>> advantage
>>>>>> to this approach is it means you only need one MedicaDataConsumer to
>>>>>> read
>>>>>> from N number of files, rather than having one MedicalDataConsumer
>>>>>>(and
>>>>>> system) per file.
>>>>>> 3. Can you send the output log when running your job?
>>>>>> 
>>>>>> 
>>>>>> Cheers,
>>>>>> Chris
>>>>>> 
>>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir"
>>>>>> <jo...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Thanks for the help, Chris!
>>>>>>> 
>>>>>>> The Javadocs you updated certainly did help my understanding of the
>>>>>>> SystemConsumer interface, although I still have not been able to
>>>>>>>get
>>>>>>> my
>>>>>>> System to work.
>>>>>>> 
>>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for
>>>>>>>what
>>>>>>> I
>>>>>>> want to do plus the hello-samza example uses it so I can use it as
>>>>>>>a
>>>>>>> reference example.
>>>>>>> 
>>>>>>> Even simply putting a call to 'put()' at the beginning of the
>>>>>>> 'start()'
>>>>>>> method with a debug message, nothing is received by my simple
>>>>>>> StreamTask
>>>>>>> (which simply forwards what is received from my System's stream
>>>>>>>onto a
>>>>>>> Kafka stream for now). As this is the case, I think there is a flaw
>>>>>>> in my
>>>>>>> understanding of something much more fundamental here relating to
>>>>>>> Samza
>>>>>>> System... I am sure there is something very simple that I am
>>>>>>>missing
>>>>>>> with
>>>>>>> my implementation, as I have based it directly off the
>>>>>>> WikipediaSystem of
>>>>>>> which I feel I have a pretty thorough understanding of now.
>>>>>>> 
>>>>>>> If anyone has time to look at my implementations for
>>>>>>>SystemConsumer,
>>>>>>> SystemFactory and the previously mentioned StreamTask's config file
>>>>>>> (they're all very simple and "too-the-point", since I'm just trying
>>>>>>>to
>>>>>>> get things working), it would be very much appreciated.
>>>>>>> 
>>>>>>> SystemFactory implementation:
>>>>>>> 
>>>>>>> 
>>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>i
>>>>>>>a/
>>>>>>> sr
>>>>>>> c
>>>>>>> 
>>>>>>> 
>>>>>>>/main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.
>>>>>>>j
>>>>>>>av
>>>>>>> a
>>>>>>> 
>>>>>>> SystemConsumer implementation:
>>>>>>> 
>>>>>>> 
>>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>i
>>>>>>>a/
>>>>>>> sr
>>>>>>> c
>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>>>>>> 
>>>>>>> StreamTask config (which specifies input from my System):
>>>>>>> 
>>>>>>> 
>>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pac
>>>>>>>k
>>>>>>>ag
>>>>>>> e/
>>>>>>> s
>>>>>>> rc/main/config/medical-data-feed.properties
>>>>>>> 
>>>>>>> And yes, once I get it working and reading a file into a Samza
>>>>>>> Stream, I
>>>>>>> will be happy to submit a patch.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Jonathan
>>>>>>> 
>>>>>>> 
>>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <cr...@linkedin.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hey Jonathan,
>>>>>>>> 
>>>>>>>> I've attempted to answer your questions by updating the Javadocs.
>>>>>>>>:)
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>http://samza.incubator.apache.org/learn/documentation/0.7.0/api/jav
>>>>>>>>a
>>>>>>>>do
>>>>>>>> cs
>>>>>>>> /
>>>>>>>> or
>>>>>>>> g/apache/samza/system/SystemConsumer.html
>>>>>>>> 
>>>>>>>> Let me know if anything doesn't make sense.
>>>>>>>> 
>>>>>>>> Also, it'd be awesome if you could contribute your file reader
>>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>>>>>> things.
>>>>>>>> Could you open a JIRA and submit a patch when you're ready?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Chris
>>>>>>>> 
>>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>>>>>> <jo...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> 
>>>>>>>>> Currently trying to write a simple Samza System that reads from a
>>>>>>>>> file
>>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing
>>>>>>>>>it
>>>>>>>>> off
>>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>>>>>> implementation seemed to be pretty straightforward (get path to
>>>>>>>>>file
>>>>>>>>> from
>>>>>>>>> config and return a SystemConsumer that reads the file at that
>>>>>>>>> path),
>>>>>>>>> although I am not 100% sure of what the purpose of all the
>>>>>>>>> SystemConsumer
>>>>>>>>> interface methods are.
>>>>>>>>> 
>>>>>>>>> start() and stop() seem fairly self-explanatory, getting called
>>>>>>>>>at
>>>>>>>>> the
>>>>>>>>> when the System is started and stopped, respectively (please let
>>>>>>>>>me
>>>>>>>>> know
>>>>>>>>> if I am wrong about any of my understandings). register() seems
>>>>>>>>>to
>>>>>>>>> be
>>>>>>>>> similar to start() in that it will be called near the beginning
>>>>>>>>>of
>>>>>>>>> the
>>>>>>>>> System, although giving access to the SystemStreamPartition and a
>>>>>>>>> given
>>>>>>>>> partition offset, correct? The poll() method seems to be where
>>>>>>>>>most
>>>>>>>>> of
>>>>>>>>> the action takes place, and going by its name, it is called often
>>>>>>>>>or
>>>>>>>>> at
>>>>>>>>> specified intervals? If so, how does this polling work and how is
>>>>>>>>> the
>>>>>>>>> interval specified?
>>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>>>>>> poll():
>>>>>>>>> these are then forwarded on to their specified
>>>>>>>>> SystemStreamPartitions
>>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>>>>>> 
>>>>>>>>> Anyway, thanks for your time and let me know how far off I am
>>>>>>>>>with
>>>>>>>>> my
>>>>>>>>> understanding of this (as I can't get anything working with my
>>>>>>>>> current
>>>>>>>>> system implementation). Will be happy to contribute back Javadoc
>>>>>>>>> patch
>>>>>>>>> reflecting my amended understanding of this interface afterwards.
>>>>>>>>> 
>>>>>>>>> Jonathan
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>
>


Re: Clarification on SystemConsumer method functionality

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Jonathan,

Ah ah. This error is caused because you're sending a non-byte-array key in
your StreamTask, and you don't have a key serializer defined. Samza, by
default, allows you to send messages to a system without a serializer
defined. If you do this, then it assumes you're giving it bytes.

Here's are the configs you need:

  
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeF
actory

  systems.kafka.samza.key.serder=string


This is causing no messages to be sent, which explains why you're unable
to see any output from your job.

We could definitely improve the error messaging here. Could you open a
JIRA for this?

Cheers,
Chris

On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <jo...@gmail.com>
wrote:

>Hi Chris,
>
>Oh dear, that's rather embarrassing, although it makes perfect sense now.
>
>Here is the first ~900 lines of the second container log (that warning
>continues to repeat):
>https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2e
>14fb0e717/gistfile1.txt
>
>There definitely seems to be an issue there. Unfortunately, I cannot work
>out where exactly the problem lies since the String cast error message is
>truncated. Do you know of any easy way to get the rest of this message?
>
>Thank you for your patience as well. It is very much appreciated!
>
>Jonathan
>
>
>On 31 Jan 2014, at 11:20, Chris Riccomini <cr...@linkedin.com> wrote:
>
>> Hey Jonathan,
>> 
>> Could you send the logs for the actual container? The AM isn't actually
>> running your code--it just manages and tells YARN to start a SECOND
>> container to run your code. :)
>> 
>> For example, in the AM logs, you'll see:
>> 
>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0
>>for
>> container container_1391135766129_0001_01_000002 on node s1
>> 
>>(http://s1:8042/node/containerlogs/container_1391135766129_0001_01_000002
>>).
>> 
>> This is the log path to the container that's running your code.
>> 
>> Alternatively, you can find the second container by clicking on the
>> ApplicationMaster link in the YARN RM web UI, and then clicking on the
>> link in the "running containers" section that ends with _000002.
>> 
>> If you could grab that log and send it here, I can take a look and help
>> you figure out what's going on.
>> 
>> Cheers,
>> Chris
>> 
>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir"
>><jo...@gmail.com>
>> wrote:
>> 
>>> Hi Chris,
>>> 
>>> Thanks for your thoughts.
>>> 
>>> 1. I will definitely put file reading on another thread, I just wanted
>>>to
>>> get it working for now with a very simple test file before I move on
>>>and
>>> re-implement my code to be more robust. Thanks for the pointers on
>>> checking for space before calling BlockingEnvelopeMap.put(), also. I
>>>will
>>> definitely come back to that afterwards.
>>> 
>>> 2. Again, will definitely revisit this point later on as well as the
>>>file
>>> offset tip.
>>> 
>>> 3. Here are the logs:
>>> application-master.log:
>>> https://gist.github.com/poltak/8726030
>>> 
>>> gc.log (probably not needed, but it's there):
>>> https://gist.github.com/poltak/8726004
>>> 
>>> stdout:
>>> https://gist.github.com/poltak/8726036
>>> 
>>> stderr was clean.
>>> 
>>> Not sure if you will find anything useful out of those, but I'm sure
>>>your
>>> understanding of it will greatly outweigh mine.
>>> 
>>> Thanks,
>>> Jonathan
>>> 
>>> 
>>> On 30 Jan 2014, at 15:47, Chris Riccomini <cr...@linkedin.com>
>>>wrote:
>>> 
>>>> Hey Jonathan,
>>>> 
>>>> Another follow-on thought. You're currently using null for the offset,
>>>> but
>>>> you could very easily use filepos, and then fseek to the filepos when
>>>>a
>>>> SystemStreamPartition is instantiated. This would give you the ability
>>>> to
>>>> "pick up where you left off" rather than re-reading the whole file
>>>>every
>>>> time your SamzaContainer starts.
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>>> 
>>>>> Hey Jonathan,
>>>>> 
>>>>> I took a look at your code. Nothing looks horribly wrong at first
>>>>> glance.
>>>>> A couple of thoughts:
>>>>> 
>>>>> 1. You're reading the full file and loading it into the
>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file is
>>>>> small, and will ALWAYS be small. If the file is NOT small, you need
>>>>>to
>>>>> move the file reading to another thread, and only
>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop methods
>>>>> would then start and stop your reader thread. The "when there is
>>>>>space"
>>>>> behavior can be implemented in one of two ways: 1) check
>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines,
>>>>> and
>>>>> block if it's above some threshold or 2) override
>>>>> BlockingEnvelopeMap.newBlockingQueue
>>>>> To use a bounded queue, which will automatically force your reader
>>>>> thread
>>>>> to block if the queue is full.
>>>>> 2. An alternative input path style would be to just define the input
>>>>> path
>>>>> as the stream name. For example,
>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>>>>> MedicalDataConsumer.register method call
>>>>> systemStreamPartition.getStream
>>>>> to get the file path, and instantiate the file reader there. The
>>>>> advantage
>>>>> to this approach is it means you only need one MedicaDataConsumer to
>>>>> read
>>>>> from N number of files, rather than having one MedicalDataConsumer
>>>>>(and
>>>>> system) per file.
>>>>> 3. Can you send the output log when running your job?
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir"
>>>>> <jo...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Thanks for the help, Chris!
>>>>>> 
>>>>>> The Javadocs you updated certainly did help my understanding of the
>>>>>> SystemConsumer interface, although I still have not been able to get
>>>>>> my
>>>>>> System to work.
>>>>>> 
>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for
>>>>>>what
>>>>>> I
>>>>>> want to do plus the hello-samza example uses it so I can use it as a
>>>>>> reference example.
>>>>>> 
>>>>>> Even simply putting a call to 'put()' at the beginning of the
>>>>>> 'start()'
>>>>>> method with a debug message, nothing is received by my simple
>>>>>> StreamTask
>>>>>> (which simply forwards what is received from my System's stream
>>>>>>onto a
>>>>>> Kafka stream for now). As this is the case, I think there is a flaw
>>>>>> in my
>>>>>> understanding of something much more fundamental here relating to
>>>>>> Samza
>>>>>> System... I am sure there is something very simple that I am missing
>>>>>> with
>>>>>> my implementation, as I have based it directly off the
>>>>>> WikipediaSystem of
>>>>>> which I feel I have a pretty thorough understanding of now.
>>>>>> 
>>>>>> If anyone has time to look at my implementations for SystemConsumer,
>>>>>> SystemFactory and the previously mentioned StreamTask's config file
>>>>>> (they're all very simple and "too-the-point", since I'm just trying
>>>>>>to
>>>>>> get things working), it would be very much appreciated.
>>>>>> 
>>>>>> SystemFactory implementation:
>>>>>> 
>>>>>> 
>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedi
>>>>>>a/
>>>>>> sr
>>>>>> c
>>>>>> 
>>>>>> 
>>>>>>/main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.j
>>>>>>av
>>>>>> a
>>>>>> 
>>>>>> SystemConsumer implementation:
>>>>>> 
>>>>>> 
>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedi
>>>>>>a/
>>>>>> sr
>>>>>> c
>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>>>>> 
>>>>>> StreamTask config (which specifies input from my System):
>>>>>> 
>>>>>> 
>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pack
>>>>>>ag
>>>>>> e/
>>>>>> s
>>>>>> rc/main/config/medical-data-feed.properties
>>>>>> 
>>>>>> And yes, once I get it working and reading a file into a Samza
>>>>>> Stream, I
>>>>>> will be happy to submit a patch.
>>>>>> 
>>>>>> Thanks,
>>>>>> Jonathan
>>>>>> 
>>>>>> 
>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <cr...@linkedin.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hey Jonathan,
>>>>>>> 
>>>>>>> I've attempted to answer your questions by updating the Javadocs.
>>>>>>>:)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>http://samza.incubator.apache.org/learn/documentation/0.7.0/api/java
>>>>>>>do
>>>>>>> cs
>>>>>>> /
>>>>>>> or
>>>>>>> g/apache/samza/system/SystemConsumer.html
>>>>>>> 
>>>>>>> Let me know if anything doesn't make sense.
>>>>>>> 
>>>>>>> Also, it'd be awesome if you could contribute your file reader
>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>>>>> things.
>>>>>>> Could you open a JIRA and submit a patch when you're ready?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>> 
>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>>>>> <jo...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hello,
>>>>>>>> 
>>>>>>>> Currently trying to write a simple Samza System that reads from a
>>>>>>>> file
>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing
>>>>>>>>it
>>>>>>>> off
>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>>>>> implementation seemed to be pretty straightforward (get path to
>>>>>>>>file
>>>>>>>> from
>>>>>>>> config and return a SystemConsumer that reads the file at that
>>>>>>>> path),
>>>>>>>> although I am not 100% sure of what the purpose of all the
>>>>>>>> SystemConsumer
>>>>>>>> interface methods are.
>>>>>>>> 
>>>>>>>> start() and stop() seem fairly self-explanatory, getting called at
>>>>>>>> the
>>>>>>>> when the System is started and stopped, respectively (please let
>>>>>>>>me
>>>>>>>> know
>>>>>>>> if I am wrong about any of my understandings). register() seems to
>>>>>>>> be
>>>>>>>> similar to start() in that it will be called near the beginning of
>>>>>>>> the
>>>>>>>> System, although giving access to the SystemStreamPartition and a
>>>>>>>> given
>>>>>>>> partition offset, correct? The poll() method seems to be where
>>>>>>>>most
>>>>>>>> of
>>>>>>>> the action takes place, and going by its name, it is called often
>>>>>>>>or
>>>>>>>> at
>>>>>>>> specified intervals? If so, how does this polling work and how is
>>>>>>>> the
>>>>>>>> interval specified?
>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>>>>> poll():
>>>>>>>> these are then forwarded on to their specified
>>>>>>>> SystemStreamPartitions
>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>>>>> 
>>>>>>>> Anyway, thanks for your time and let me know how far off I am with
>>>>>>>> my
>>>>>>>> understanding of this (as I can't get anything working with my
>>>>>>>> current
>>>>>>>> system implementation). Will be happy to contribute back Javadoc
>>>>>>>> patch
>>>>>>>> reflecting my amended understanding of this interface afterwards.
>>>>>>>> 
>>>>>>>> Jonathan
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>