You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by sgg <sg...@gmail.com> on 2013/12/07 13:18:05 UTC

Example Initable Task

Does anyone have a working example of a Samza job using an InitableTask?

It wasn't clear from the documentation what ways to specify the input parameters to the init() method.  Is it a matter of adding lines in the config file that look like: 
task.foo.bar = someVal

?

Also, when I tried to run a simple example with an InitableTask, I get an error.  It seems there is some problem when Samza attempts to instantiate the class


Here is the sample task I am trying to run:
public class SimpleSamzaTask implements InitableTask {
  private static final SystemStream OUTPUT_STREAM = 
        new SystemStream("kafka", "simpleout");
  
  public SimpleSamzaTask(){
	  super();
  }

  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
	  System.out.println("hello world");
	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: " + envelope.getMessage()));
  }

  @Override
  public void init(Config arg0, TaskContext arg1) throws Exception {
	  // TODO Auto-generated method stub
	  System.out.println("in init");
  }
}

Thoughts?

Re: Example Initable Task

Posted by sgg <sg...@gmail.com>.
yep, see 'em now.  Thanks again.
On Dec 9, 2013, at 10:42 PM, Chris Riccomini <cr...@linkedin.com> wrote:

> Hey sgg,
> 
> If you're using YARN, which it sounds like you are, they should be piped
> to the stdout file of the container (not the ApplicationMaster). The way
> to find the container is by going to the YARN web UI, and clicking on the
> ApplicationMaster link for your Samza job. This will lead you to the
> ApplicationMaster's web UI (confusing, I know), which will have a link to
> your Samza job's containers. If you're playing with hello-samza, it'll be
> something like container_12345678_1234536_2. That last "2" is the
> container number for your Samza job: 1 is the AM, and 2 is the Samza
> container that's running your StreamTasks.
> 
> Cheers,
> Chris
> 
> On 12/9/13 7:37 PM, "sgg" <sg...@gmail.com> wrote:
> 
>> Hi Chris:
>> DOH!  that was the problem.  Made the change to implement both interfaces
>> and things ran fine!   Thanks for the pointer!
>> 
>> BTW, where does the output from the System.out.println statements appear?
>> i.e. where is stdout being piped to? I had expected to see it in the logs
>> facility in the Yarn web console, but these print statements don't appear
>> in the yarn stdout log.  Where should I be looking?
>> 
>> sgg
>> On Dec 9, 2013, at 10:14 PM, Chris Riccomini <cr...@linkedin.com>
>> wrote:
>> 
>>> Hey sgg,
>>> 
>>> Ah, I didn't notice, but your code does not implement StreamTask, just
>>> InitableTask.
>>> 
>>> These interfaces are like mix-ins. You MUST implement StreamTask. Try:
>>> 
>>> public class SimpleSamzaTask implements StreamTask, InitableTask {
>>> 
>>>   ...
>>> }
>>> 
>>> Sorry about that.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 12/9/13 5:45 PM, "sgg" <sg...@gmail.com> wrote:
>>> 
>>>> Ok thanks Chris.  But I still need to figure out why the init() method
>>>> is
>>>> not getting called, in fact, the entire Samza job fails.  It works if
>>>> the
>>>> task implements StreamTask (obviously not invoking init()), but at
>>>> least
>>>> the task runs.  When I change it to implement InitableTask() as shown,
>>>> the job fails.  It seems to not be able to instantiate the task
>>>> object, I
>>>> was hoping there would be an example that runs that would allow me to
>>>> see
>>>> what a correct start up sequence looks like.
>>>> 
>>>> sgg
>>>> On Dec 9, 2013, at 12:27 PM, Chris Riccomini <cr...@linkedin.com>
>>>> wrote:
>>>> 
>>>>> Hi there,
>>>>> 
>>>>> Your task looks good. Your init() method receives two parameters:
>>>>> Config,
>>>>> and TaskContext. The config object has *all* config properties defined
>>>>> in
>>>>> your job's config. If you were to write:
>>>>> 
>>>>> @Override
>>>>> public void init(Config config, TaskContext context) throws Exception
>>>>> {
>>>>> System.out.println(config.get("task.foo.bar");
>>>>> }
>>>>> 
>>>>> 
>>>>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>>>> print
>>>>> "someVal" once for each partition that the Samza container is
>>>>> responsible
>>>>> for. For example, if you had a single Samza container
>>>>> (yarn.container.count=1, or using LocalJobFactory), and you had
>>>>> defined
>>>>> a
>>>>> single input stream that had 4 partitions, your logs would show
>>>>> "someVal"
>>>>> printed four times (one for each partition that the Samza container is
>>>>> responsible for processing).
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 12/7/13 4:18 AM, "sgg" <sg...@gmail.com> wrote:
>>>>> 
>>>>>> Does anyone have a working example of a Samza job using an
>>>>>> InitableTask?
>>>>>> 
>>>>>> It wasn't clear from the documentation what ways to specify the input
>>>>>> parameters to the init() method.  Is it a matter of adding lines in
>>>>>> the
>>>>>> config file that look like:
>>>>>> task.foo.bar = someVal
>>>>>> 
>>>>>> ?
>>>>>> 
>>>>>> Also, when I tried to run a simple example with an InitableTask, I
>>>>>> get
>>>>>> an
>>>>>> error.  It seems there is some problem when Samza attempts to
>>>>>> instantiate
>>>>>> the class
>>>>>> 
>>>>>> 
>>>>>> Here is the sample task I am trying to run:
>>>>>> public class SimpleSamzaTask implements InitableTask {
>>>>>> private static final SystemStream OUTPUT_STREAM =
>>>>>>     new SystemStream("kafka", "simpleout");
>>>>>> 
>>>>>> public SimpleSamzaTask(){
>>>>>> 	  super();
>>>>>> }
>>>>>> 
>>>>>> public void process(IncomingMessageEnvelope envelope,
>>>>>> MessageCollector
>>>>>> collector, TaskCoordinator coordinator) {
>>>>>> 	  System.out.println("hello world");
>>>>>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello:
>>>>>> "
>>>>>> +
>>>>>> envelope.getMessage()));
>>>>>> }
>>>>>> 
>>>>>> @Override
>>>>>> public void init(Config arg0, TaskContext arg1) throws Exception {
>>>>>> 	  // TODO Auto-generated method stub
>>>>>> 	  System.out.println("in init");
>>>>>> }
>>>>>> }
>>>>>> 
>>>>>> Thoughts?
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Example Initable Task

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey sgg,

If you're using YARN, which it sounds like you are, they should be piped
to the stdout file of the container (not the ApplicationMaster). The way
to find the container is by going to the YARN web UI, and clicking on the
ApplicationMaster link for your Samza job. This will lead you to the
ApplicationMaster's web UI (confusing, I know), which will have a link to
your Samza job's containers. If you're playing with hello-samza, it'll be
something like container_12345678_1234536_2. That last "2" is the
container number for your Samza job: 1 is the AM, and 2 is the Samza
container that's running your StreamTasks.

Cheers,
Chris

On 12/9/13 7:37 PM, "sgg" <sg...@gmail.com> wrote:

>Hi Chris:
>DOH!  that was the problem.  Made the change to implement both interfaces
>and things ran fine!   Thanks for the pointer!
>
>BTW, where does the output from the System.out.println statements appear?
>i.e. where is stdout being piped to? I had expected to see it in the logs
>facility in the Yarn web console, but these print statements don't appear
>in the yarn stdout log.  Where should I be looking?
>
>sgg
>On Dec 9, 2013, at 10:14 PM, Chris Riccomini <cr...@linkedin.com>
>wrote:
>
>> Hey sgg,
>> 
>> Ah, I didn't notice, but your code does not implement StreamTask, just
>> InitableTask.
>> 
>> These interfaces are like mix-ins. You MUST implement StreamTask. Try:
>> 
>>  public class SimpleSamzaTask implements StreamTask, InitableTask {
>> 
>>    ...
>>  }
>> 
>> Sorry about that.
>> 
>> Cheers,
>> Chris
>> 
>> On 12/9/13 5:45 PM, "sgg" <sg...@gmail.com> wrote:
>> 
>>> Ok thanks Chris.  But I still need to figure out why the init() method
>>>is
>>> not getting called, in fact, the entire Samza job fails.  It works if
>>>the
>>> task implements StreamTask (obviously not invoking init()), but at
>>>least
>>> the task runs.  When I change it to implement InitableTask() as shown,
>>> the job fails.  It seems to not be able to instantiate the task
>>>object, I
>>> was hoping there would be an example that runs that would allow me to
>>>see
>>> what a correct start up sequence looks like.
>>> 
>>> sgg
>>> On Dec 9, 2013, at 12:27 PM, Chris Riccomini <cr...@linkedin.com>
>>> wrote:
>>> 
>>>> Hi there,
>>>> 
>>>> Your task looks good. Your init() method receives two parameters:
>>>> Config,
>>>> and TaskContext. The config object has *all* config properties defined
>>>> in
>>>> your job's config. If you were to write:
>>>> 
>>>> @Override
>>>> public void init(Config config, TaskContext context) throws Exception
>>>>{
>>>> System.out.println(config.get("task.foo.bar");
>>>> }
>>>> 
>>>> 
>>>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>>> print
>>>> "someVal" once for each partition that the Samza container is
>>>> responsible
>>>> for. For example, if you had a single Samza container
>>>> (yarn.container.count=1, or using LocalJobFactory), and you had
>>>>defined
>>>> a
>>>> single input stream that had 4 partitions, your logs would show
>>>> "someVal"
>>>> printed four times (one for each partition that the Samza container is
>>>> responsible for processing).
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 12/7/13 4:18 AM, "sgg" <sg...@gmail.com> wrote:
>>>> 
>>>>> Does anyone have a working example of a Samza job using an
>>>>> InitableTask?
>>>>> 
>>>>> It wasn't clear from the documentation what ways to specify the input
>>>>> parameters to the init() method.  Is it a matter of adding lines in
>>>>>the
>>>>> config file that look like:
>>>>> task.foo.bar = someVal
>>>>> 
>>>>> ?
>>>>> 
>>>>> Also, when I tried to run a simple example with an InitableTask, I
>>>>>get
>>>>> an
>>>>> error.  It seems there is some problem when Samza attempts to
>>>>> instantiate
>>>>> the class
>>>>> 
>>>>> 
>>>>> Here is the sample task I am trying to run:
>>>>> public class SimpleSamzaTask implements InitableTask {
>>>>> private static final SystemStream OUTPUT_STREAM =
>>>>>      new SystemStream("kafka", "simpleout");
>>>>> 
>>>>> public SimpleSamzaTask(){
>>>>> 	  super();
>>>>> }
>>>>> 
>>>>> public void process(IncomingMessageEnvelope envelope,
>>>>>MessageCollector
>>>>> collector, TaskCoordinator coordinator) {
>>>>> 	  System.out.println("hello world");
>>>>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello:
>>>>>"
>>>>> +
>>>>> envelope.getMessage()));
>>>>> }
>>>>> 
>>>>> @Override
>>>>> public void init(Config arg0, TaskContext arg1) throws Exception {
>>>>> 	  // TODO Auto-generated method stub
>>>>> 	  System.out.println("in init");
>>>>> }
>>>>> }
>>>>> 
>>>>> Thoughts?
>>>> 
>>> 
>> 
>


Re: Example Initable Task

Posted by sgg <sg...@gmail.com>.
Hi Chris:
DOH!  that was the problem.  Made the change to implement both interfaces and things ran fine!   Thanks for the pointer!

BTW, where does the output from the System.out.println statements appear? i.e. where is stdout being piped to? I had expected to see it in the logs facility in the Yarn web console, but these print statements don't appear in the yarn stdout log.  Where should I be looking?

sgg
On Dec 9, 2013, at 10:14 PM, Chris Riccomini <cr...@linkedin.com> wrote:

> Hey sgg,
> 
> Ah, I didn't notice, but your code does not implement StreamTask, just
> InitableTask.
> 
> These interfaces are like mix-ins. You MUST implement StreamTask. Try:
> 
>  public class SimpleSamzaTask implements StreamTask, InitableTask {
> 
>    ...
>  }
> 
> Sorry about that.
> 
> Cheers,
> Chris
> 
> On 12/9/13 5:45 PM, "sgg" <sg...@gmail.com> wrote:
> 
>> Ok thanks Chris.  But I still need to figure out why the init() method is
>> not getting called, in fact, the entire Samza job fails.  It works if the
>> task implements StreamTask (obviously not invoking init()), but at least
>> the task runs.  When I change it to implement InitableTask() as shown,
>> the job fails.  It seems to not be able to instantiate the task object, I
>> was hoping there would be an example that runs that would allow me to see
>> what a correct start up sequence looks like.
>> 
>> sgg
>> On Dec 9, 2013, at 12:27 PM, Chris Riccomini <cr...@linkedin.com>
>> wrote:
>> 
>>> Hi there,
>>> 
>>> Your task looks good. Your init() method receives two parameters:
>>> Config,
>>> and TaskContext. The config object has *all* config properties defined
>>> in
>>> your job's config. If you were to write:
>>> 
>>> @Override
>>> public void init(Config config, TaskContext context) throws Exception {
>>> System.out.println(config.get("task.foo.bar");
>>> }
>>> 
>>> 
>>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>> print
>>> "someVal" once for each partition that the Samza container is
>>> responsible
>>> for. For example, if you had a single Samza container
>>> (yarn.container.count=1, or using LocalJobFactory), and you had defined
>>> a
>>> single input stream that had 4 partitions, your logs would show
>>> "someVal"
>>> printed four times (one for each partition that the Samza container is
>>> responsible for processing).
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 12/7/13 4:18 AM, "sgg" <sg...@gmail.com> wrote:
>>> 
>>>> Does anyone have a working example of a Samza job using an
>>>> InitableTask?
>>>> 
>>>> It wasn't clear from the documentation what ways to specify the input
>>>> parameters to the init() method.  Is it a matter of adding lines in the
>>>> config file that look like:
>>>> task.foo.bar = someVal
>>>> 
>>>> ?
>>>> 
>>>> Also, when I tried to run a simple example with an InitableTask, I get
>>>> an
>>>> error.  It seems there is some problem when Samza attempts to
>>>> instantiate
>>>> the class
>>>> 
>>>> 
>>>> Here is the sample task I am trying to run:
>>>> public class SimpleSamzaTask implements InitableTask {
>>>> private static final SystemStream OUTPUT_STREAM =
>>>>      new SystemStream("kafka", "simpleout");
>>>> 
>>>> public SimpleSamzaTask(){
>>>> 	  super();
>>>> }
>>>> 
>>>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>>>> collector, TaskCoordinator coordinator) {
>>>> 	  System.out.println("hello world");
>>>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: "
>>>> +
>>>> envelope.getMessage()));
>>>> }
>>>> 
>>>> @Override
>>>> public void init(Config arg0, TaskContext arg1) throws Exception {
>>>> 	  // TODO Auto-generated method stub
>>>> 	  System.out.println("in init");
>>>> }
>>>> }
>>>> 
>>>> Thoughts?
>>> 
>> 
> 


Re: Example Initable Task

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey sgg,

Ah, I didn't notice, but your code does not implement StreamTask, just
InitableTask.

These interfaces are like mix-ins. You MUST implement StreamTask. Try:

  public class SimpleSamzaTask implements StreamTask, InitableTask {

    ...
  }

Sorry about that.

Cheers,
Chris

On 12/9/13 5:45 PM, "sgg" <sg...@gmail.com> wrote:

>Ok thanks Chris.  But I still need to figure out why the init() method is
>not getting called, in fact, the entire Samza job fails.  It works if the
>task implements StreamTask (obviously not invoking init()), but at least
>the task runs.  When I change it to implement InitableTask() as shown,
>the job fails.  It seems to not be able to instantiate the task object, I
>was hoping there would be an example that runs that would allow me to see
>what a correct start up sequence looks like.
>
>sgg
>On Dec 9, 2013, at 12:27 PM, Chris Riccomini <cr...@linkedin.com>
>wrote:
>
>> Hi there,
>> 
>> Your task looks good. Your init() method receives two parameters:
>>Config,
>> and TaskContext. The config object has *all* config properties defined
>>in
>> your job's config. If you were to write:
>> 
>> @Override
>> public void init(Config config, TaskContext context) throws Exception {
>>  System.out.println(config.get("task.foo.bar");
>>  }
>> 
>> 
>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>print
>> "someVal" once for each partition that the Samza container is
>>responsible
>> for. For example, if you had a single Samza container
>> (yarn.container.count=1, or using LocalJobFactory), and you had defined
>>a
>> single input stream that had 4 partitions, your logs would show
>>"someVal"
>> printed four times (one for each partition that the Samza container is
>> responsible for processing).
>> 
>> Cheers,
>> Chris
>> 
>> On 12/7/13 4:18 AM, "sgg" <sg...@gmail.com> wrote:
>> 
>>> Does anyone have a working example of a Samza job using an
>>>InitableTask?
>>> 
>>> It wasn't clear from the documentation what ways to specify the input
>>> parameters to the init() method.  Is it a matter of adding lines in the
>>> config file that look like:
>>> task.foo.bar = someVal
>>> 
>>> ?
>>> 
>>> Also, when I tried to run a simple example with an InitableTask, I get
>>>an
>>> error.  It seems there is some problem when Samza attempts to
>>>instantiate
>>> the class
>>> 
>>> 
>>> Here is the sample task I am trying to run:
>>> public class SimpleSamzaTask implements InitableTask {
>>> private static final SystemStream OUTPUT_STREAM =
>>>       new SystemStream("kafka", "simpleout");
>>> 
>>> public SimpleSamzaTask(){
>>> 	  super();
>>> }
>>> 
>>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>>> collector, TaskCoordinator coordinator) {
>>> 	  System.out.println("hello world");
>>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: "
>>>+
>>> envelope.getMessage()));
>>> }
>>> 
>>> @Override
>>> public void init(Config arg0, TaskContext arg1) throws Exception {
>>> 	  // TODO Auto-generated method stub
>>> 	  System.out.println("in init");
>>> }
>>> }
>>> 
>>> Thoughts?
>> 
>


Re: Example Initable Task

Posted by sgg <sg...@gmail.com>.
Ok thanks Chris.  But I still need to figure out why the init() method is not getting called, in fact, the entire Samza job fails.  It works if the task implements StreamTask (obviously not invoking init()), but at least the task runs.  When I change it to implement InitableTask() as shown, the job fails.  It seems to not be able to instantiate the task object, I was hoping there would be an example that runs that would allow me to see what a correct start up sequence looks like.

sgg
On Dec 9, 2013, at 12:27 PM, Chris Riccomini <cr...@linkedin.com> wrote:

> Hi there,
> 
> Your task looks good. Your init() method receives two parameters: Config,
> and TaskContext. The config object has *all* config properties defined in
> your job's config. If you were to write:
> 
> @Override
> public void init(Config config, TaskContext context) throws Exception {
>  System.out.println(config.get("task.foo.bar");
>  }
> 
> 
> And you had task.foo.bar=someVal, you'd then expect to see your task print
> "someVal" once for each partition that the Samza container is responsible
> for. For example, if you had a single Samza container
> (yarn.container.count=1, or using LocalJobFactory), and you had defined a
> single input stream that had 4 partitions, your logs would show "someVal"
> printed four times (one for each partition that the Samza container is
> responsible for processing).
> 
> Cheers,
> Chris
> 
> On 12/7/13 4:18 AM, "sgg" <sg...@gmail.com> wrote:
> 
>> Does anyone have a working example of a Samza job using an InitableTask?
>> 
>> It wasn't clear from the documentation what ways to specify the input
>> parameters to the init() method.  Is it a matter of adding lines in the
>> config file that look like:
>> task.foo.bar = someVal
>> 
>> ?
>> 
>> Also, when I tried to run a simple example with an InitableTask, I get an
>> error.  It seems there is some problem when Samza attempts to instantiate
>> the class
>> 
>> 
>> Here is the sample task I am trying to run:
>> public class SimpleSamzaTask implements InitableTask {
>> private static final SystemStream OUTPUT_STREAM =
>>       new SystemStream("kafka", "simpleout");
>> 
>> public SimpleSamzaTask(){
>> 	  super();
>> }
>> 
>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>> collector, TaskCoordinator coordinator) {
>> 	  System.out.println("hello world");
>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: " +
>> envelope.getMessage()));
>> }
>> 
>> @Override
>> public void init(Config arg0, TaskContext arg1) throws Exception {
>> 	  // TODO Auto-generated method stub
>> 	  System.out.println("in init");
>> }
>> }
>> 
>> Thoughts?
> 


Re: Example Initable Task

Posted by Chris Riccomini <cr...@linkedin.com>.
Hi there,

Your task looks good. Your init() method receives two parameters: Config,
and TaskContext. The config object has *all* config properties defined in
your job's config. If you were to write:

@Override
public void init(Config config, TaskContext context) throws Exception {
  System.out.println(config.get("task.foo.bar");
  }


And you had task.foo.bar=someVal, you'd then expect to see your task print
"someVal" once for each partition that the Samza container is responsible
for. For example, if you had a single Samza container
(yarn.container.count=1, or using LocalJobFactory), and you had defined a
single input stream that had 4 partitions, your logs would show "someVal"
printed four times (one for each partition that the Samza container is
responsible for processing).

Cheers,
Chris

On 12/7/13 4:18 AM, "sgg" <sg...@gmail.com> wrote:

>Does anyone have a working example of a Samza job using an InitableTask?
>
>It wasn't clear from the documentation what ways to specify the input
>parameters to the init() method.  Is it a matter of adding lines in the
>config file that look like:
>task.foo.bar = someVal
>
>?
>
>Also, when I tried to run a simple example with an InitableTask, I get an
>error.  It seems there is some problem when Samza attempts to instantiate
>the class
>
>
>Here is the sample task I am trying to run:
>public class SimpleSamzaTask implements InitableTask {
>  private static final SystemStream OUTPUT_STREAM =
>        new SystemStream("kafka", "simpleout");
>  
>  public SimpleSamzaTask(){
>	  super();
>  }
>
>  public void process(IncomingMessageEnvelope envelope, MessageCollector
>collector, TaskCoordinator coordinator) {
>	  System.out.println("hello world");
>	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: " +
>envelope.getMessage()));
>  }
>
>  @Override
>  public void init(Config arg0, TaskContext arg1) throws Exception {
>	  // TODO Auto-generated method stub
>	  System.out.println("in init");
>  }
>}
>
>Thoughts?