You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Bruno Bonacci <br...@gmail.com> on 2015/09/12 22:47:14 UTC

Runtime Execution Model

Hi,
I'm looking for additional documentation on the different RUNTIME
EXECUTION MODELS of the different `job.factory.class`.

I'm particularly interested on how each factory (ThreadJobFactory,
ProcessJobFactory and YarnJobFactory) will create tasks consume and process
messages out of Kafka and the thread model used.

I did a few tests with the ThreadJob factory consuming out of a kafka
topic with 5 partitions and I was expecting that it would use multiple
threads to consume/process the different partitions, however it is
using only one thread at runtime.

Is there any way to tell Samza to use multiple processing threads (1 per
partition)??


Thanks
Bruno

Re: Runtime Execution Model

Posted by br...@gmail.com.
Many thanks Yi

Bye
Bruno

> On 14 Sep 2015, at 23:49, Yi Pan <ni...@gmail.com> wrote:
> 
> Hi, Bruno,
> 
> The number of partitions consumed by a single task is also configurable via
> the partition assignment policies (job.systemstreampartition.
> grouper.factory). By default, there are two partition assignment policies
> implemented: org.apache.samza.container.grouper.stream.GroupByPartitionFactory
> and org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory.
> The detailed explanation is available here:
> http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html
> 
> Thanks!
> 
> -Yi
> 
>> On Mon, Sep 14, 2015 at 3:37 PM, <br...@gmail.com> wrote:
>> 
>> Hi Yi,
>> 
>> Does a single task consume from a single partition or it consumes from
>> more/all partitions?
>> 
>> Thanks
>> Bruno
>> 
>>> On 14 Sep 2015, at 23:22, Yi Pan <ni...@gmail.com> wrote:
>>> 
>>> Hi, Bruno,
>>> 
>>> The number of containers are configurable in YarnJobFactory via
>>> yarn.container.count.
>>> Each container is a single threaded model and you can run multiple tasks
>> in
>>> a single container.
>>> At maximum, you can have as many containers as the number of tasks in
>> this
>>> config to achieve 1 task / thread.
>>> 
>>> Hope that clarifies the config a bit more for you.
>>> 
>>> Thanks!
>>> 
>>> -Yi
>>> 
>>> On Mon, Sep 14, 2015 at 3:16 PM, Bruno Bonacci <br...@gmail.com>
>>> wrote:
>>> 
>>>> Thanks Yan for writing me back,
>>>> 
>>>> That's ok for ThreadJobFactory and ProcessJobFactory but what about the
>>>> YarnJobFactory?
>>>> How many task/executors will be spawning?
>>>> 
>>>> 
>>>> Bruno
>>>> 
>>>>> On Mon, Sep 14, 2015 at 7:08 PM, Yan Fang <ya...@gmail.com>
>> wrote:
>>>>> 
>>>>> Hi Bruno,
>>>>> 
>>>>> AFAIK, there is no existing JobFactory that brings as many threads as
>> the
>>>>> partition number. But I think nothing stops you to implement this: you
>>>> can
>>>>> get the partition information from the JobCoordinator, and then bring
>> as
>>>>> many threads as the partition/task number.
>>>>> 
>>>>> Since the two local factories (ThreadJobFactory and ProcessJobFactory)
>>>> are
>>>>> mainly for development, there is no additional document. But most of
>> the
>>>>> code here
>>>>> <
>> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
>>>>> is
>>>>> self-explained.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Fang, Yan
>>>>> yanfang724@gmail.com
>>>>> 
>>>>> On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
>> bruno.bonacci@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> I'm looking for additional documentation on the different RUNTIME
>>>>>> EXECUTION MODELS of the different `job.factory.class`.
>>>>>> 
>>>>>> I'm particularly interested on how each factory (ThreadJobFactory,
>>>>>> ProcessJobFactory and YarnJobFactory) will create tasks consume and
>>>>> process
>>>>>> messages out of Kafka and the thread model used.
>>>>>> 
>>>>>> I did a few tests with the ThreadJob factory consuming out of a kafka
>>>>>> topic with 5 partitions and I was expecting that it would use multiple
>>>>>> threads to consume/process the different partitions, however it is
>>>>>> using only one thread at runtime.
>>>>>> 
>>>>>> Is there any way to tell Samza to use multiple processing threads (1
>>>> per
>>>>>> partition)??
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> Bruno
>> 

Re: Runtime Execution Model

Posted by Yi Pan <ni...@gmail.com>.
Hi, Bruno,

The number of partitions consumed by a single task is also configurable via
the partition assignment policies (job.systemstreampartition.
grouper.factory). By default, there are two partition assignment policies
implemented: org.apache.samza.container.grouper.stream.GroupByPartitionFactory
and org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory.
The detailed explanation is available here:
http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html

Thanks!

-Yi

On Mon, Sep 14, 2015 at 3:37 PM, <br...@gmail.com> wrote:

> Hi Yi,
>
> Does a single task consume from a single partition or it consumes from
> more/all partitions?
>
> Thanks
> Bruno
>
> > On 14 Sep 2015, at 23:22, Yi Pan <ni...@gmail.com> wrote:
> >
> > Hi, Bruno,
> >
> > The number of containers are configurable in YarnJobFactory via
> > yarn.container.count.
> > Each container is a single threaded model and you can run multiple tasks
> in
> > a single container.
> > At maximum, you can have as many containers as the number of tasks in
> this
> > config to achieve 1 task / thread.
> >
> > Hope that clarifies the config a bit more for you.
> >
> > Thanks!
> >
> > -Yi
> >
> > On Mon, Sep 14, 2015 at 3:16 PM, Bruno Bonacci <br...@gmail.com>
> > wrote:
> >
> >> Thanks Yan for writing me back,
> >>
> >> That's ok for ThreadJobFactory and ProcessJobFactory but what about the
> >> YarnJobFactory?
> >> How many task/executors will be spawning?
> >>
> >>
> >> Bruno
> >>
> >>> On Mon, Sep 14, 2015 at 7:08 PM, Yan Fang <ya...@gmail.com>
> wrote:
> >>>
> >>> Hi Bruno,
> >>>
> >>> AFAIK, there is no existing JobFactory that brings as many threads as
> the
> >>> partition number. But I think nothing stops you to implement this: you
> >> can
> >>> get the partition information from the JobCoordinator, and then bring
> as
> >>> many threads as the partition/task number.
> >>>
> >>> Since the two local factories (ThreadJobFactory and ProcessJobFactory)
> >> are
> >>> mainly for development, there is no additional document. But most of
> the
> >>> code here
> >>> <
> >>
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> >>> is
> >>> self-explained.
> >>>
> >>> Thanks,
> >>>
> >>> Fang, Yan
> >>> yanfang724@gmail.com
> >>>
> >>> On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
> bruno.bonacci@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>> I'm looking for additional documentation on the different RUNTIME
> >>>> EXECUTION MODELS of the different `job.factory.class`.
> >>>>
> >>>> I'm particularly interested on how each factory (ThreadJobFactory,
> >>>> ProcessJobFactory and YarnJobFactory) will create tasks consume and
> >>> process
> >>>> messages out of Kafka and the thread model used.
> >>>>
> >>>> I did a few tests with the ThreadJob factory consuming out of a kafka
> >>>> topic with 5 partitions and I was expecting that it would use multiple
> >>>> threads to consume/process the different partitions, however it is
> >>>> using only one thread at runtime.
> >>>>
> >>>> Is there any way to tell Samza to use multiple processing threads (1
> >> per
> >>>> partition)??
> >>>>
> >>>>
> >>>> Thanks
> >>>> Bruno
> >>
>

Re: Runtime Execution Model

Posted by br...@gmail.com.
Hi Yi,

Does a single task consume from a single partition or it consumes from more/all partitions?

Thanks
Bruno

> On 14 Sep 2015, at 23:22, Yi Pan <ni...@gmail.com> wrote:
> 
> Hi, Bruno,
> 
> The number of containers are configurable in YarnJobFactory via
> yarn.container.count.
> Each container is a single threaded model and you can run multiple tasks in
> a single container.
> At maximum, you can have as many containers as the number of tasks in this
> config to achieve 1 task / thread.
> 
> Hope that clarifies the config a bit more for you.
> 
> Thanks!
> 
> -Yi
> 
> On Mon, Sep 14, 2015 at 3:16 PM, Bruno Bonacci <br...@gmail.com>
> wrote:
> 
>> Thanks Yan for writing me back,
>> 
>> That's ok for ThreadJobFactory and ProcessJobFactory but what about the
>> YarnJobFactory?
>> How many task/executors will be spawning?
>> 
>> 
>> Bruno
>> 
>>> On Mon, Sep 14, 2015 at 7:08 PM, Yan Fang <ya...@gmail.com> wrote:
>>> 
>>> Hi Bruno,
>>> 
>>> AFAIK, there is no existing JobFactory that brings as many threads as the
>>> partition number. But I think nothing stops you to implement this: you
>> can
>>> get the partition information from the JobCoordinator, and then bring as
>>> many threads as the partition/task number.
>>> 
>>> Since the two local factories (ThreadJobFactory and ProcessJobFactory)
>> are
>>> mainly for development, there is no additional document. But most of the
>>> code here
>>> <
>> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
>>> is
>>> self-explained.
>>> 
>>> Thanks,
>>> 
>>> Fang, Yan
>>> yanfang724@gmail.com
>>> 
>>> On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> I'm looking for additional documentation on the different RUNTIME
>>>> EXECUTION MODELS of the different `job.factory.class`.
>>>> 
>>>> I'm particularly interested on how each factory (ThreadJobFactory,
>>>> ProcessJobFactory and YarnJobFactory) will create tasks consume and
>>> process
>>>> messages out of Kafka and the thread model used.
>>>> 
>>>> I did a few tests with the ThreadJob factory consuming out of a kafka
>>>> topic with 5 partitions and I was expecting that it would use multiple
>>>> threads to consume/process the different partitions, however it is
>>>> using only one thread at runtime.
>>>> 
>>>> Is there any way to tell Samza to use multiple processing threads (1
>> per
>>>> partition)??
>>>> 
>>>> 
>>>> Thanks
>>>> Bruno
>> 

Re: Runtime Execution Model

Posted by Yi Pan <ni...@gmail.com>.
Hi, Bruno,

The number of containers are configurable in YarnJobFactory via
yarn.container.count.
Each container is a single threaded model and you can run multiple tasks in
a single container.
At maximum, you can have as many containers as the number of tasks in this
config to achieve 1 task / thread.

Hope that clarifies the config a bit more for you.

Thanks!

-Yi

On Mon, Sep 14, 2015 at 3:16 PM, Bruno Bonacci <br...@gmail.com>
wrote:

> Thanks Yan for writing me back,
>
> That's ok for ThreadJobFactory and ProcessJobFactory but what about the
> YarnJobFactory?
> How many task/executors will be spawning?
>
>
> Bruno
>
> On Mon, Sep 14, 2015 at 7:08 PM, Yan Fang <ya...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > AFAIK, there is no existing JobFactory that brings as many threads as the
> > partition number. But I think nothing stops you to implement this: you
> can
> > get the partition information from the JobCoordinator, and then bring as
> > many threads as the partition/task number.
> >
> > Since the two local factories (ThreadJobFactory and ProcessJobFactory)
> are
> > mainly for development, there is no additional document. But most of the
> > code here
> > <
> >
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> > >
> > is
> > self-explained.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > I'm looking for additional documentation on the different RUNTIME
> > > EXECUTION MODELS of the different `job.factory.class`.
> > >
> > > I'm particularly interested on how each factory (ThreadJobFactory,
> > > ProcessJobFactory and YarnJobFactory) will create tasks consume and
> > process
> > > messages out of Kafka and the thread model used.
> > >
> > > I did a few tests with the ThreadJob factory consuming out of a kafka
> > > topic with 5 partitions and I was expecting that it would use multiple
> > > threads to consume/process the different partitions, however it is
> > > using only one thread at runtime.
> > >
> > > Is there any way to tell Samza to use multiple processing threads (1
> per
> > > partition)??
> > >
> > >
> > > Thanks
> > > Bruno
> > >
> >
>

Re: Runtime Execution Model

Posted by Bruno Bonacci <br...@gmail.com>.
Thanks Yan for writing me back,

That's ok for ThreadJobFactory and ProcessJobFactory but what about the
YarnJobFactory?
How many task/executors will be spawning?


Bruno

On Mon, Sep 14, 2015 at 7:08 PM, Yan Fang <ya...@gmail.com> wrote:

> Hi Bruno,
>
> AFAIK, there is no existing JobFactory that brings as many threads as the
> partition number. But I think nothing stops you to implement this: you can
> get the partition information from the JobCoordinator, and then bring as
> many threads as the partition/task number.
>
> Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
> mainly for development, there is no additional document. But most of the
> code here
> <
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> >
> is
> self-explained.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
> wrote:
>
> > Hi,
> > I'm looking for additional documentation on the different RUNTIME
> > EXECUTION MODELS of the different `job.factory.class`.
> >
> > I'm particularly interested on how each factory (ThreadJobFactory,
> > ProcessJobFactory and YarnJobFactory) will create tasks consume and
> process
> > messages out of Kafka and the thread model used.
> >
> > I did a few tests with the ThreadJob factory consuming out of a kafka
> > topic with 5 partitions and I was expecting that it would use multiple
> > threads to consume/process the different partitions, however it is
> > using only one thread at runtime.
> >
> > Is there any way to tell Samza to use multiple processing threads (1 per
> > partition)??
> >
> >
> > Thanks
> > Bruno
> >
>

Re: Runtime Execution Model

Posted by Lukas Steiblys <lu...@doubledutch.me>.
Hi Yan,

If I understand correctly, a good way to increase parallelism in 
ThreadJobFactory in 0.9.1 is to simply pass in N as the container count into 
JobCoordinator(config, N) and then spin up a new ThreadJob for each 
container. Does that sound right?

I am not sure how this is affected by 0.10.0 yet as I see a lot of changes 
in that code.

Lukas

-----Original Message----- 
From: Yan Fang
Sent: Monday, September 14, 2015 11:08 AM
To: dev@samza.apache.org
Subject: Re: Runtime Execution Model

Hi Bruno,

AFAIK, there is no existing JobFactory that brings as many threads as the
partition number. But I think nothing stops you to implement this: you can
get the partition information from the JobCoordinator, and then bring as
many threads as the partition/task number.

Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
mainly for development, there is no additional document. But most of the
code here
<https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local>
is
self-explained.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
wrote:

> Hi,
> I'm looking for additional documentation on the different RUNTIME
> EXECUTION MODELS of the different `job.factory.class`.
>
> I'm particularly interested on how each factory (ThreadJobFactory,
> ProcessJobFactory and YarnJobFactory) will create tasks consume and 
> process
> messages out of Kafka and the thread model used.
>
> I did a few tests with the ThreadJob factory consuming out of a kafka
> topic with 5 partitions and I was expecting that it would use multiple
> threads to consume/process the different partitions, however it is
> using only one thread at runtime.
>
> Is there any way to tell Samza to use multiple processing threads (1 per
> partition)??
>
>
> Thanks
> Bruno
> 


Re: Runtime Execution Model

Posted by Lukas Steiblys <lu...@doubledutch.me>.
Yes, I was expecting a separate thread for each task, which is not the case. 
I am not looking at scaling beyond a single machine though for a single job.

Lukas


-----Original Message----- 
From: Yi Pan
Sent: Wednesday, September 16, 2015 5:04 PM
To: dev@samza.apache.org
Subject: Re: Runtime Execution Model

Hi, Lukas,


I currently use one process in a Docker container. My assumption was that
> it already spawned multiple processes so I was comfortable with that
> solution until we have to scale some job beyond one machine.
>
>
I assume that you are referring "spawned multiple *threads*" instead of
"spawned multiple processes"? I don't think that is right, if you are
referring to "multiple threads for multiple tasks in a Samza job". The
ProcessJobFactory will launch another process w/ a single container to run
the actual job. That container is still single threaded and is assigned all
input partitions. If you are looking to scale one job beyond one machine,
you will see that the current ProcessJobFactory does not support that.



> Docker containers are restarted automatically on failure and we collect
> metrics from the metrics log in Kafka in case there is any serious fault
> and we need to be alerted. So far this worked fine, but our set up is
> fairly small. I would like to see the ProcessJobFactory spawn multiple
> processes automatically since the amount of data we're processing is
> growing quickly.


This is aligned w/ Yan's comment to enhance the ProcessJobFactory to
improve the parallelism. In addition, you may want to look into Monal's
patch (from Netflix's team) on SAMZA-41. That will allow an intermediate
solution to scale ProcessJob across multiple machines.


>
>
> Lukas
>
> -----Original Message----- From: Yan Fang
> Sent: Wednesday, September 16, 2015 3:45 PM
>
> To: dev@samza.apache.org
> Subject: Re: Runtime Execution Model
>
> -- Hi Lukas,
>
> I want to learn more from your production environment. How do you use
> ProcessJobFactory
> in Docker containers? Do you use one ProcessJobFactory process all the
> tasks, or spawn out as many threads as the task number? How is the
> fault-tolerance?
>
>
> -- Hi Yi,
>
> * Any progress in your side, in terms of the standalone job? (Chris' patch
> is big, :)
>
> *  Invert the JobCoordinator to the standalone Samza process s.t. the
> leader
> process of the Samza job becomes the JobCoordinator
>        Currently, we run the JobCoordinator first, and then Yarn talks to
> the JobCoordinator. Isn't it enough so far?
>
> *  Make the partition assignment as pluggable model to distribute the
> tasks to
> all Samza processes in a job-group in coordination.
>       I think the reason for this is for the Kafka's new feature.The API
> design needs to be compatible with Kafka.
>
> *  Make Samza process multi-threaded while maintaining the per-task
> single-threaded
> programming model for the users
>       Do we already have this, or need to add that? This I think can be
> done in current ProcessJob. We can have the same number of threads as the
> tasks.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan <ni...@gmail.com> wrote:
>
> Hi, all,
>>
>> Thanks for pitching in for the improvement plan. We have actually
>> discussed
>> this for a while now. In a complete view, I think that there are the
>> following issues need to be addressed:
>> 1) Currently, the steps involved to launch a Samza process are too 
>> complex
>> and intertwined with YARN.
>> 2) The Samza partition assignment is embedded within YARN AppMaster
>> implementation, which makes it difficult to run the job outside YARN
>> environment
>>
>> We have actually already started some work to address the above issues:
>> 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
>> and has a proto-type patch available. This allows a ZK-based coordination
>> to start standalone Samza processes w/o YARN
>>
>> There are also planned changes to allow de-coupling of Samza job
>> coordination logic from YARN AppMaster:
>> 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us
>> to
>> keep the Samza-specific JobCoordinator logic independent from
>> cluster-management systems.
>>
>> There is one more thing I am thinking: we may want to make the partition
>> assignment logic as a pluggable module, such that we can choose different
>> coordination mechanism in partition assignment as needed (e.g. ZK-based,
>> cluster-management based, or Kafka-based coordination).
>>
>>
>> Ultimately, I think that we should try to refactor the current job
>> launching model to the following:
>> 1) Make standalone Samza process the standard Samza process model
>> 2) Invert the JobCoordinator to the standalone Samza process s.t. the
>> leader process of the Samza job becomes the JobCoordinator
>> 3) Make the partition assignment as pluggable model to distribute the
>> tasks
>> to all Samza processes in a job-group in coordination
>> 4) Make launching of Samza process agnostic of cluster-management 
>> systems.
>> The cluster-management systems will simply provide the functionality of
>> placing the standard Samza processes to actual available nodes
>> 5) Make Samza process multi-threaded while maintaining the per-task
>> single-threaded programming model for the users.
>>
>> Thoughts?
>>
>> -Yi
>>
>>
>>
>>
>>
>> On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
>> hannes.stockner@gmail.com>
>> wrote:
>>
>> > +1
>> >
>> >
>> > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <bruno.bonacci@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > I support what Lukas saying. Samza packaging requirements are not
>> > friendly,
>> > > I use the ThreadJobFactory for the same reason.
>> > >
>> > > Bruno
>> > >
>> > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lukas@doubledutch.me
>> >
>> > > wrote:
>> > >
>> > > > Hi Yan,
>> > > >
>> > > > We use Samza in a production environment using ProcessJobFactory in
>> > > Docker
>> > > > containers because it greatly simplifies our deployment process and
>> > makes
>> > > > much better use of resources.
>> > > >
>> > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
>> > > > multithreaded? I will look into doing that myself, but I think it
>> might
>> > > be
>> > > > useful to implement this for everyone. I am sure there are plenty 
>> > > > of
>> > > cases
>> > > > where people do not want to use YARN, but want more parallelism in
>> > their
>> > > > tasks.
>> > > >
>> > > > Lukas
>> > > >
>> > > > -----Original Message----- From: Yan Fang
>> > > > Sent: Monday, September 14, 2015 11:08 AM
>> > > > To: dev@samza.apache.org
>> > > > Subject: Re: Runtime Execution Model
>> > > >
>> > > >
>> > > > Hi Bruno,
>> > > >
>> > > > AFAIK, there is no existing JobFactory that brings as many threads
>> > > > as
>> > the
>> > > > partition number. But I think nothing stops you to implement this:
>> you
>> > > can
>> > > > get the partition information from the JobCoordinator, and then > >
>> > bring
>> > as
>> > > > many threads as the partition/task number.
>> > > >
>> > > > Since the two local factories (ThreadJobFactory and
>> ProcessJobFactory)
>> > > are
>> > > > mainly for development, there is no additional document. But most 
>> > > > of
>> > the
>> > > > code here
>> > > > <
>> > > >
>> > >
>> >
>>
>> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
>> > > > >
>> > > > is
>> > > > self-explained.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Fang, Yan
>> > > > yanfang724@gmail.com
>> > > >
>> > > > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
>> > bruno.bonacci@gmail.com>
>> > > > wrote:
>> > > >
>> > > > Hi,
>> > > >> I'm looking for additional documentation on the different RUNTIME
>> > > >> EXECUTION MODELS of the different `job.factory.class`.
>> > > >>
>> > > >> I'm particularly interested on how each factory (ThreadJobFactory,
>> > > >> ProcessJobFactory and YarnJobFactory) will create tasks consume 
>> > > >> and
>> > > >> process
>> > > >> messages out of Kafka and the thread model used.
>> > > >>
>> > > >> I did a few tests with the ThreadJob factory consuming out of a
>> kafka
>> > > >> topic with 5 partitions and I was expecting that it would use
>> multiple
>> > > >> threads to consume/process the different partitions, however it is
>> > > >> using only one thread at runtime.
>> > > >>
>> > > >> Is there any way to tell Samza to use multiple processing threads
>> > > >> (1
>> > per
>> > > >> partition)??
>> > > >>
>> > > >>
>> > > >> Thanks
>> > > >> Bruno
>> > > >>
>> > > >>
>> > > >
>> > >
>> >
>>
>>
> 


Re: Runtime Execution Model

Posted by Yi Pan <ni...@gmail.com>.
Hi, Lukas,


I currently use one process in a Docker container. My assumption was that
> it already spawned multiple processes so I was comfortable with that
> solution until we have to scale some job beyond one machine.
>
>
I assume that you are referring "spawned multiple *threads*" instead of
"spawned multiple processes"? I don't think that is right, if you are
referring to "multiple threads for multiple tasks in a Samza job". The
ProcessJobFactory will launch another process w/ a single container to run
the actual job. That container is still single threaded and is assigned all
input partitions. If you are looking to scale one job beyond one machine,
you will see that the current ProcessJobFactory does not support that.


> Docker containers are restarted automatically on failure and we collect
> metrics from the metrics log in Kafka in case there is any serious fault
> and we need to be alerted. So far this worked fine, but our set up is
> fairly small. I would like to see the ProcessJobFactory spawn multiple
> processes automatically since the amount of data we're processing is
> growing quickly.


This is aligned w/ Yan's comment to enhance the ProcessJobFactory to
improve the parallelism. In addition, you may want to look into Monal's
patch (from Netflix's team) on SAMZA-41. That will allow an intermediate
solution to scale ProcessJob across multiple machines.


>
>
> Lukas
>
> -----Original Message----- From: Yan Fang
> Sent: Wednesday, September 16, 2015 3:45 PM
>
> To: dev@samza.apache.org
> Subject: Re: Runtime Execution Model
>
> -- Hi Lukas,
>
> I want to learn more from your production environment. How do you use
> ProcessJobFactory
> in Docker containers? Do you use one ProcessJobFactory process all the
> tasks, or spawn out as many threads as the task number? How is the
> fault-tolerance?
>
>
> -- Hi Yi,
>
> * Any progress in your side, in terms of the standalone job? (Chris' patch
> is big, :)
>
> *  Invert the JobCoordinator to the standalone Samza process s.t. the
> leader
> process of the Samza job becomes the JobCoordinator
>        Currently, we run the JobCoordinator first, and then Yarn talks to
> the JobCoordinator. Isn't it enough so far?
>
> *  Make the partition assignment as pluggable model to distribute the
> tasks to
> all Samza processes in a job-group in coordination.
>       I think the reason for this is for the Kafka's new feature.The API
> design needs to be compatible with Kafka.
>
> *  Make Samza process multi-threaded while maintaining the per-task
> single-threaded
> programming model for the users
>       Do we already have this, or need to add that? This I think can be
> done in current ProcessJob. We can have the same number of threads as the
> tasks.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan <ni...@gmail.com> wrote:
>
> Hi, all,
>>
>> Thanks for pitching in for the improvement plan. We have actually
>> discussed
>> this for a while now. In a complete view, I think that there are the
>> following issues need to be addressed:
>> 1) Currently, the steps involved to launch a Samza process are too complex
>> and intertwined with YARN.
>> 2) The Samza partition assignment is embedded within YARN AppMaster
>> implementation, which makes it difficult to run the job outside YARN
>> environment
>>
>> We have actually already started some work to address the above issues:
>> 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
>> and has a proto-type patch available. This allows a ZK-based coordination
>> to start standalone Samza processes w/o YARN
>>
>> There are also planned changes to allow de-coupling of Samza job
>> coordination logic from YARN AppMaster:
>> 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us
>> to
>> keep the Samza-specific JobCoordinator logic independent from
>> cluster-management systems.
>>
>> There is one more thing I am thinking: we may want to make the partition
>> assignment logic as a pluggable module, such that we can choose different
>> coordination mechanism in partition assignment as needed (e.g. ZK-based,
>> cluster-management based, or Kafka-based coordination).
>>
>>
>> Ultimately, I think that we should try to refactor the current job
>> launching model to the following:
>> 1) Make standalone Samza process the standard Samza process model
>> 2) Invert the JobCoordinator to the standalone Samza process s.t. the
>> leader process of the Samza job becomes the JobCoordinator
>> 3) Make the partition assignment as pluggable model to distribute the
>> tasks
>> to all Samza processes in a job-group in coordination
>> 4) Make launching of Samza process agnostic of cluster-management systems.
>> The cluster-management systems will simply provide the functionality of
>> placing the standard Samza processes to actual available nodes
>> 5) Make Samza process multi-threaded while maintaining the per-task
>> single-threaded programming model for the users.
>>
>> Thoughts?
>>
>> -Yi
>>
>>
>>
>>
>>
>> On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
>> hannes.stockner@gmail.com>
>> wrote:
>>
>> > +1
>> >
>> >
>> > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <bruno.bonacci@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > I support what Lukas saying. Samza packaging requirements are not
>> > friendly,
>> > > I use the ThreadJobFactory for the same reason.
>> > >
>> > > Bruno
>> > >
>> > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lukas@doubledutch.me
>> >
>> > > wrote:
>> > >
>> > > > Hi Yan,
>> > > >
>> > > > We use Samza in a production environment using ProcessJobFactory in
>> > > Docker
>> > > > containers because it greatly simplifies our deployment process and
>> > makes
>> > > > much better use of resources.
>> > > >
>> > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
>> > > > multithreaded? I will look into doing that myself, but I think it
>> might
>> > > be
>> > > > useful to implement this for everyone. I am sure there are plenty of
>> > > cases
>> > > > where people do not want to use YARN, but want more parallelism in
>> > their
>> > > > tasks.
>> > > >
>> > > > Lukas
>> > > >
>> > > > -----Original Message----- From: Yan Fang
>> > > > Sent: Monday, September 14, 2015 11:08 AM
>> > > > To: dev@samza.apache.org
>> > > > Subject: Re: Runtime Execution Model
>> > > >
>> > > >
>> > > > Hi Bruno,
>> > > >
>> > > > AFAIK, there is no existing JobFactory that brings as many threads
>> > > > as
>> > the
>> > > > partition number. But I think nothing stops you to implement this:
>> you
>> > > can
>> > > > get the partition information from the JobCoordinator, and then > >
>> > bring
>> > as
>> > > > many threads as the partition/task number.
>> > > >
>> > > > Since the two local factories (ThreadJobFactory and
>> ProcessJobFactory)
>> > > are
>> > > > mainly for development, there is no additional document. But most of
>> > the
>> > > > code here
>> > > > <
>> > > >
>> > >
>> >
>>
>> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
>> > > > >
>> > > > is
>> > > > self-explained.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Fang, Yan
>> > > > yanfang724@gmail.com
>> > > >
>> > > > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
>> > bruno.bonacci@gmail.com>
>> > > > wrote:
>> > > >
>> > > > Hi,
>> > > >> I'm looking for additional documentation on the different RUNTIME
>> > > >> EXECUTION MODELS of the different `job.factory.class`.
>> > > >>
>> > > >> I'm particularly interested on how each factory (ThreadJobFactory,
>> > > >> ProcessJobFactory and YarnJobFactory) will create tasks consume and
>> > > >> process
>> > > >> messages out of Kafka and the thread model used.
>> > > >>
>> > > >> I did a few tests with the ThreadJob factory consuming out of a
>> kafka
>> > > >> topic with 5 partitions and I was expecting that it would use
>> multiple
>> > > >> threads to consume/process the different partitions, however it is
>> > > >> using only one thread at runtime.
>> > > >>
>> > > >> Is there any way to tell Samza to use multiple processing threads
>> > > >> (1
>> > per
>> > > >> partition)??
>> > > >>
>> > > >>
>> > > >> Thanks
>> > > >> Bruno
>> > > >>
>> > > >>
>> > > >
>> > >
>> >
>>
>>
>

Re: Runtime Execution Model

Posted by Lukas Steiblys <lu...@doubledutch.me>.
Hi Yan,

I currently use one process in a Docker container. My assumption was that it 
already spawned multiple processes so I was comfortable with that solution 
until we have to scale some job beyond one machine.

Docker containers are restarted automatically on failure and we collect 
metrics from the metrics log in Kafka in case there is any serious fault and 
we need to be alerted. So far this worked fine, but our set up is fairly 
small. I would like to see the ProcessJobFactory spawn multiple processes 
automatically since the amount of data we're processing is growing quickly.

Lukas

-----Original Message----- 
From: Yan Fang
Sent: Wednesday, September 16, 2015 3:45 PM
To: dev@samza.apache.org
Subject: Re: Runtime Execution Model

-- Hi Lukas,

I want to learn more from your production environment. How do you use
ProcessJobFactory
in Docker containers? Do you use one ProcessJobFactory process all the
tasks, or spawn out as many threads as the task number? How is the
fault-tolerance?


-- Hi Yi,

* Any progress in your side, in terms of the standalone job? (Chris' patch
is big, :)

*  Invert the JobCoordinator to the standalone Samza process s.t. the leader
process of the Samza job becomes the JobCoordinator
        Currently, we run the JobCoordinator first, and then Yarn talks to
the JobCoordinator. Isn't it enough so far?

*  Make the partition assignment as pluggable model to distribute the tasks 
to
all Samza processes in a job-group in coordination.
       I think the reason for this is for the Kafka's new feature.The API
design needs to be compatible with Kafka.

*  Make Samza process multi-threaded while maintaining the per-task
single-threaded
programming model for the users
       Do we already have this, or need to add that? This I think can be
done in current ProcessJob. We can have the same number of threads as the
tasks.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan <ni...@gmail.com> wrote:

> Hi, all,
>
> Thanks for pitching in for the improvement plan. We have actually 
> discussed
> this for a while now. In a complete view, I think that there are the
> following issues need to be addressed:
> 1) Currently, the steps involved to launch a Samza process are too complex
> and intertwined with YARN.
> 2) The Samza partition assignment is embedded within YARN AppMaster
> implementation, which makes it difficult to run the job outside YARN
> environment
>
> We have actually already started some work to address the above issues:
> 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
> and has a proto-type patch available. This allows a ZK-based coordination
> to start standalone Samza processes w/o YARN
>
> There are also planned changes to allow de-coupling of Samza job
> coordination logic from YARN AppMaster:
> 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us 
> to
> keep the Samza-specific JobCoordinator logic independent from
> cluster-management systems.
>
> There is one more thing I am thinking: we may want to make the partition
> assignment logic as a pluggable module, such that we can choose different
> coordination mechanism in partition assignment as needed (e.g. ZK-based,
> cluster-management based, or Kafka-based coordination).
>
>
> Ultimately, I think that we should try to refactor the current job
> launching model to the following:
> 1) Make standalone Samza process the standard Samza process model
> 2) Invert the JobCoordinator to the standalone Samza process s.t. the
> leader process of the Samza job becomes the JobCoordinator
> 3) Make the partition assignment as pluggable model to distribute the 
> tasks
> to all Samza processes in a job-group in coordination
> 4) Make launching of Samza process agnostic of cluster-management systems.
> The cluster-management systems will simply provide the functionality of
> placing the standard Samza processes to actual available nodes
> 5) Make Samza process multi-threaded while maintaining the per-task
> single-threaded programming model for the users.
>
> Thoughts?
>
> -Yi
>
>
>
>
>
> On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
> hannes.stockner@gmail.com>
> wrote:
>
> > +1
> >
> >
> > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <br...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I support what Lukas saying. Samza packaging requirements are not
> > friendly,
> > > I use the ThreadJobFactory for the same reason.
> > >
> > > Bruno
> > >
> > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lu...@doubledutch.me>
> > > wrote:
> > >
> > > > Hi Yan,
> > > >
> > > > We use Samza in a production environment using ProcessJobFactory in
> > > Docker
> > > > containers because it greatly simplifies our deployment process and
> > makes
> > > > much better use of resources.
> > > >
> > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > > > multithreaded? I will look into doing that myself, but I think it
> might
> > > be
> > > > useful to implement this for everyone. I am sure there are plenty of
> > > cases
> > > > where people do not want to use YARN, but want more parallelism in
> > their
> > > > tasks.
> > > >
> > > > Lukas
> > > >
> > > > -----Original Message----- From: Yan Fang
> > > > Sent: Monday, September 14, 2015 11:08 AM
> > > > To: dev@samza.apache.org
> > > > Subject: Re: Runtime Execution Model
> > > >
> > > >
> > > > Hi Bruno,
> > > >
> > > > AFAIK, there is no existing JobFactory that brings as many threads 
> > > > as
> > the
> > > > partition number. But I think nothing stops you to implement this:
> you
> > > can
> > > > get the partition information from the JobCoordinator, and then 
> > > > bring
> > as
> > > > many threads as the partition/task number.
> > > >
> > > > Since the two local factories (ThreadJobFactory and
> ProcessJobFactory)
> > > are
> > > > mainly for development, there is no additional document. But most of
> > the
> > > > code here
> > > > <
> > > >
> > >
> >
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> > > > >
> > > > is
> > > > self-explained.
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang724@gmail.com
> > > >
> > > > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
> > bruno.bonacci@gmail.com>
> > > > wrote:
> > > >
> > > > Hi,
> > > >> I'm looking for additional documentation on the different RUNTIME
> > > >> EXECUTION MODELS of the different `job.factory.class`.
> > > >>
> > > >> I'm particularly interested on how each factory (ThreadJobFactory,
> > > >> ProcessJobFactory and YarnJobFactory) will create tasks consume and
> > > >> process
> > > >> messages out of Kafka and the thread model used.
> > > >>
> > > >> I did a few tests with the ThreadJob factory consuming out of a
> kafka
> > > >> topic with 5 partitions and I was expecting that it would use
> multiple
> > > >> threads to consume/process the different partitions, however it is
> > > >> using only one thread at runtime.
> > > >>
> > > >> Is there any way to tell Samza to use multiple processing threads 
> > > >> (1
> > per
> > > >> partition)??
> > > >>
> > > >>
> > > >> Thanks
> > > >> Bruno
> > > >>
> > > >>
> > > >
> > >
> >
> 


Re: Runtime Execution Model

Posted by Yi Pan <ni...@gmail.com>.
Hi, Yan,

I put my thoughts below.


On Wed, Sep 16, 2015 at 3:45 PM, Yan Fang <ya...@gmail.com> wrote:

> * Any progress in your side, in terms of the standalone job? (Chris' patch
> is big, :)
>
>
Yeah, we have paused on it for a quarter and will try to pick it up in Q4.


> *  Invert the JobCoordinator to the standalone Samza process s.t. the
> leader
> process of the Samza job becomes the JobCoordinator
>         Currently, we run the JobCoordinator first, and then Yarn talks to
> the JobCoordinator. Isn't it enough so far?
>

This current logic is: YARN AppMaster is started first, inside the
AppMaster code, JobCoordinator is instantiated and perform the job-level
coordination works. Hence, the lifecycle of JobCoordinator is within the
AppMaster. This model has strong assumptions on the model that YARN always
start a single process as AppMaster for the whole job before any container
processes are started. Hence, the JobCoordinator can perform job-level
coordination work before requesting the containers.

There are several reasons to invert the life cycle of JobCoordinator and
the AppMaster:
1) JobCoordinator should work w/o YARN. In standalone Samza process, we are
removing the dependency on an external component (i.e. YARN) for leader
election (in YARN, a single AppMaster process is the leader for the group
of processes in a Samza job). The next step is, after the leader for a
group of Samza processes in a job is elected, the logic in JobCoordinator
should be executed (i.e. partition assignment, coordinator stream based job
configuration). Then, job model is initialized and each Samza process can
start processing messages. In this model, JobCoordinator should live w/o
AppMaster.
2) JobCoordinator work w/ multiple different cluster management systems. If
we want to integrate w/ different cluster management systems (e.g. Mesos)
to launch processes on dynamic resource nodes, it is better to make
JobCoordinator lifecycle not depending on any particular system's specific
component (e.g. YARN AppMaster).


>
> *  Make the partition assignment as pluggable model to distribute the
> tasks to
> all Samza processes in a job-group in coordination.
>        I think the reason for this is for the Kafka's new feature.The API
> design needs to be compatible with Kafka.
>
>
Yes. I agree.


> *  Make Samza process multi-threaded while maintaining the per-task
> single-threaded
> programming model for the users
>        Do we already have this, or need to add that? This I think can be
> done in current ProcessJob. We can have the same number of threads as the
> tasks.
>

Not in the current code. Yeah, in short term, I think that we can enhance
the current ProcessJobFactory to do this. In the long term, it may even be
good to make ProcessJob as the standard Samza process model in all
environments (i.e. YARN/Mesos/standalone).


>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, all,
> >
> > Thanks for pitching in for the improvement plan. We have actually
> discussed
> > this for a while now. In a complete view, I think that there are the
> > following issues need to be addressed:
> > 1) Currently, the steps involved to launch a Samza process are too
> complex
> > and intertwined with YARN.
> > 2) The Samza partition assignment is embedded within YARN AppMaster
> > implementation, which makes it difficult to run the job outside YARN
> > environment
> >
> > We have actually already started some work to address the above issues:
> > 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
> > and has a proto-type patch available. This allows a ZK-based coordination
> > to start standalone Samza processes w/o YARN
> >
> > There are also planned changes to allow de-coupling of Samza job
> > coordination logic from YARN AppMaster:
> > 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us
> to
> > keep the Samza-specific JobCoordinator logic independent from
> > cluster-management systems.
> >
> > There is one more thing I am thinking: we may want to make the partition
> > assignment logic as a pluggable module, such that we can choose different
> > coordination mechanism in partition assignment as needed (e.g. ZK-based,
> > cluster-management based, or Kafka-based coordination).
> >
> >
> > Ultimately, I think that we should try to refactor the current job
> > launching model to the following:
> > 1) Make standalone Samza process the standard Samza process model
> > 2) Invert the JobCoordinator to the standalone Samza process s.t. the
> > leader process of the Samza job becomes the JobCoordinator
> > 3) Make the partition assignment as pluggable model to distribute the
> tasks
> > to all Samza processes in a job-group in coordination
> > 4) Make launching of Samza process agnostic of cluster-management
> systems.
> > The cluster-management systems will simply provide the functionality of
> > placing the standard Samza processes to actual available nodes
> > 5) Make Samza process multi-threaded while maintaining the per-task
> > single-threaded programming model for the users.
> >
> > Thoughts?
> >
> > -Yi
> >
> >
> >
> >
> >
> > On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
> > hannes.stockner@gmail.com>
> > wrote:
> >
> > > +1
> > >
> > >
> > > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <
> bruno.bonacci@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I support what Lukas saying. Samza packaging requirements are not
> > > friendly,
> > > > I use the ThreadJobFactory for the same reason.
> > > >
> > > > Bruno
> > > >
> > > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <
> lukas@doubledutch.me>
> > > > wrote:
> > > >
> > > > > Hi Yan,
> > > > >
> > > > > We use Samza in a production environment using ProcessJobFactory in
> > > > Docker
> > > > > containers because it greatly simplifies our deployment process and
> > > makes
> > > > > much better use of resources.
> > > > >
> > > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > > > > multithreaded? I will look into doing that myself, but I think it
> > might
> > > > be
> > > > > useful to implement this for everyone. I am sure there are plenty
> of
> > > > cases
> > > > > where people do not want to use YARN, but want more parallelism in
> > > their
> > > > > tasks.
> > > > >
> > > > > Lukas
> > > > >
> > > > > -----Original Message----- From: Yan Fang
> > > > > Sent: Monday, September 14, 2015 11:08 AM
> > > > > To: dev@samza.apache.org
> > > > > Subject: Re: Runtime Execution Model
> > > > >
> > > > >
> > > > > Hi Bruno,
> > > > >
> > > > > AFAIK, there is no existing JobFactory that brings as many threads
> as
> > > the
> > > > > partition number. But I think nothing stops you to implement this:
> > you
> > > > can
> > > > > get the partition information from the JobCoordinator, and then
> bring
> > > as
> > > > > many threads as the partition/task number.
> > > > >
> > > > > Since the two local factories (ThreadJobFactory and
> > ProcessJobFactory)
> > > > are
> > > > > mainly for development, there is no additional document. But most
> of
> > > the
> > > > > code here
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> > > > > >
> > > > > is
> > > > > self-explained.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Fang, Yan
> > > > > yanfang724@gmail.com
> > > > >
> > > > > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
> > > bruno.bonacci@gmail.com>
> > > > > wrote:
> > > > >
> > > > > Hi,
> > > > >> I'm looking for additional documentation on the different RUNTIME
> > > > >> EXECUTION MODELS of the different `job.factory.class`.
> > > > >>
> > > > >> I'm particularly interested on how each factory (ThreadJobFactory,
> > > > >> ProcessJobFactory and YarnJobFactory) will create tasks consume
> and
> > > > >> process
> > > > >> messages out of Kafka and the thread model used.
> > > > >>
> > > > >> I did a few tests with the ThreadJob factory consuming out of a
> > kafka
> > > > >> topic with 5 partitions and I was expecting that it would use
> > multiple
> > > > >> threads to consume/process the different partitions, however it is
> > > > >> using only one thread at runtime.
> > > > >>
> > > > >> Is there any way to tell Samza to use multiple processing threads
> (1
> > > per
> > > > >> partition)??
> > > > >>
> > > > >>
> > > > >> Thanks
> > > > >> Bruno
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Runtime Execution Model

Posted by Yan Fang <ya...@gmail.com>.
-- Hi Lukas,

I want to learn more from your production environment. How do you use
ProcessJobFactory
in Docker containers? Do you use one ProcessJobFactory process all the
tasks, or spawn out as many threads as the task number? How is the
fault-tolerance?


-- Hi Yi,

* Any progress in your side, in terms of the standalone job? (Chris' patch
is big, :)

*  Invert the JobCoordinator to the standalone Samza process s.t. the leader
process of the Samza job becomes the JobCoordinator
        Currently, we run the JobCoordinator first, and then Yarn talks to
the JobCoordinator. Isn't it enough so far?

*  Make the partition assignment as pluggable model to distribute the tasks to
all Samza processes in a job-group in coordination.
       I think the reason for this is for the Kafka's new feature.The API
design needs to be compatible with Kafka.

*  Make Samza process multi-threaded while maintaining the per-task
single-threaded
programming model for the users
       Do we already have this, or need to add that? This I think can be
done in current ProcessJob. We can have the same number of threads as the
tasks.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan <ni...@gmail.com> wrote:

> Hi, all,
>
> Thanks for pitching in for the improvement plan. We have actually discussed
> this for a while now. In a complete view, I think that there are the
> following issues need to be addressed:
> 1) Currently, the steps involved to launch a Samza process are too complex
> and intertwined with YARN.
> 2) The Samza partition assignment is embedded within YARN AppMaster
> implementation, which makes it difficult to run the job outside YARN
> environment
>
> We have actually already started some work to address the above issues:
> 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
> and has a proto-type patch available. This allows a ZK-based coordination
> to start standalone Samza processes w/o YARN
>
> There are also planned changes to allow de-coupling of Samza job
> coordination logic from YARN AppMaster:
> 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us to
> keep the Samza-specific JobCoordinator logic independent from
> cluster-management systems.
>
> There is one more thing I am thinking: we may want to make the partition
> assignment logic as a pluggable module, such that we can choose different
> coordination mechanism in partition assignment as needed (e.g. ZK-based,
> cluster-management based, or Kafka-based coordination).
>
>
> Ultimately, I think that we should try to refactor the current job
> launching model to the following:
> 1) Make standalone Samza process the standard Samza process model
> 2) Invert the JobCoordinator to the standalone Samza process s.t. the
> leader process of the Samza job becomes the JobCoordinator
> 3) Make the partition assignment as pluggable model to distribute the tasks
> to all Samza processes in a job-group in coordination
> 4) Make launching of Samza process agnostic of cluster-management systems.
> The cluster-management systems will simply provide the functionality of
> placing the standard Samza processes to actual available nodes
> 5) Make Samza process multi-threaded while maintaining the per-task
> single-threaded programming model for the users.
>
> Thoughts?
>
> -Yi
>
>
>
>
>
> On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
> hannes.stockner@gmail.com>
> wrote:
>
> > +1
> >
> >
> > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <br...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I support what Lukas saying. Samza packaging requirements are not
> > friendly,
> > > I use the ThreadJobFactory for the same reason.
> > >
> > > Bruno
> > >
> > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lu...@doubledutch.me>
> > > wrote:
> > >
> > > > Hi Yan,
> > > >
> > > > We use Samza in a production environment using ProcessJobFactory in
> > > Docker
> > > > containers because it greatly simplifies our deployment process and
> > makes
> > > > much better use of resources.
> > > >
> > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > > > multithreaded? I will look into doing that myself, but I think it
> might
> > > be
> > > > useful to implement this for everyone. I am sure there are plenty of
> > > cases
> > > > where people do not want to use YARN, but want more parallelism in
> > their
> > > > tasks.
> > > >
> > > > Lukas
> > > >
> > > > -----Original Message----- From: Yan Fang
> > > > Sent: Monday, September 14, 2015 11:08 AM
> > > > To: dev@samza.apache.org
> > > > Subject: Re: Runtime Execution Model
> > > >
> > > >
> > > > Hi Bruno,
> > > >
> > > > AFAIK, there is no existing JobFactory that brings as many threads as
> > the
> > > > partition number. But I think nothing stops you to implement this:
> you
> > > can
> > > > get the partition information from the JobCoordinator, and then bring
> > as
> > > > many threads as the partition/task number.
> > > >
> > > > Since the two local factories (ThreadJobFactory and
> ProcessJobFactory)
> > > are
> > > > mainly for development, there is no additional document. But most of
> > the
> > > > code here
> > > > <
> > > >
> > >
> >
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> > > > >
> > > > is
> > > > self-explained.
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang724@gmail.com
> > > >
> > > > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
> > bruno.bonacci@gmail.com>
> > > > wrote:
> > > >
> > > > Hi,
> > > >> I'm looking for additional documentation on the different RUNTIME
> > > >> EXECUTION MODELS of the different `job.factory.class`.
> > > >>
> > > >> I'm particularly interested on how each factory (ThreadJobFactory,
> > > >> ProcessJobFactory and YarnJobFactory) will create tasks consume and
> > > >> process
> > > >> messages out of Kafka and the thread model used.
> > > >>
> > > >> I did a few tests with the ThreadJob factory consuming out of a
> kafka
> > > >> topic with 5 partitions and I was expecting that it would use
> multiple
> > > >> threads to consume/process the different partitions, however it is
> > > >> using only one thread at runtime.
> > > >>
> > > >> Is there any way to tell Samza to use multiple processing threads (1
> > per
> > > >> partition)??
> > > >>
> > > >>
> > > >> Thanks
> > > >> Bruno
> > > >>
> > > >>
> > > >
> > >
> >
>

Re: Runtime Execution Model

Posted by Yi Pan <ni...@gmail.com>.
Hi, all,

Thanks for pitching in for the improvement plan. We have actually discussed
this for a while now. In a complete view, I think that there are the
following issues need to be addressed:
1) Currently, the steps involved to launch a Samza process are too complex
and intertwined with YARN.
2) The Samza partition assignment is embedded within YARN AppMaster
implementation, which makes it difficult to run the job outside YARN
environment

We have actually already started some work to address the above issues:
1) SAMZA-516: support standalone Samza jobs. Chris has started this work
and has a proto-type patch available. This allows a ZK-based coordination
to start standalone Samza processes w/o YARN

There are also planned changes to allow de-coupling of Samza job
coordination logic from YARN AppMaster:
1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us to
keep the Samza-specific JobCoordinator logic independent from
cluster-management systems.

There is one more thing I am thinking: we may want to make the partition
assignment logic as a pluggable module, such that we can choose different
coordination mechanism in partition assignment as needed (e.g. ZK-based,
cluster-management based, or Kafka-based coordination).


Ultimately, I think that we should try to refactor the current job
launching model to the following:
1) Make standalone Samza process the standard Samza process model
2) Invert the JobCoordinator to the standalone Samza process s.t. the
leader process of the Samza job becomes the JobCoordinator
3) Make the partition assignment as pluggable model to distribute the tasks
to all Samza processes in a job-group in coordination
4) Make launching of Samza process agnostic of cluster-management systems.
The cluster-management systems will simply provide the functionality of
placing the standard Samza processes to actual available nodes
5) Make Samza process multi-threaded while maintaining the per-task
single-threaded programming model for the users.

Thoughts?

-Yi





On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <ha...@gmail.com>
wrote:

> +1
>
>
> On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <br...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I support what Lukas saying. Samza packaging requirements are not
> friendly,
> > I use the ThreadJobFactory for the same reason.
> >
> > Bruno
> >
> > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lu...@doubledutch.me>
> > wrote:
> >
> > > Hi Yan,
> > >
> > > We use Samza in a production environment using ProcessJobFactory in
> > Docker
> > > containers because it greatly simplifies our deployment process and
> makes
> > > much better use of resources.
> > >
> > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > > multithreaded? I will look into doing that myself, but I think it might
> > be
> > > useful to implement this for everyone. I am sure there are plenty of
> > cases
> > > where people do not want to use YARN, but want more parallelism in
> their
> > > tasks.
> > >
> > > Lukas
> > >
> > > -----Original Message----- From: Yan Fang
> > > Sent: Monday, September 14, 2015 11:08 AM
> > > To: dev@samza.apache.org
> > > Subject: Re: Runtime Execution Model
> > >
> > >
> > > Hi Bruno,
> > >
> > > AFAIK, there is no existing JobFactory that brings as many threads as
> the
> > > partition number. But I think nothing stops you to implement this: you
> > can
> > > get the partition information from the JobCoordinator, and then bring
> as
> > > many threads as the partition/task number.
> > >
> > > Since the two local factories (ThreadJobFactory and ProcessJobFactory)
> > are
> > > mainly for development, there is no additional document. But most of
> the
> > > code here
> > > <
> > >
> >
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> > > >
> > > is
> > > self-explained.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang724@gmail.com
> > >
> > > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <
> bruno.bonacci@gmail.com>
> > > wrote:
> > >
> > > Hi,
> > >> I'm looking for additional documentation on the different RUNTIME
> > >> EXECUTION MODELS of the different `job.factory.class`.
> > >>
> > >> I'm particularly interested on how each factory (ThreadJobFactory,
> > >> ProcessJobFactory and YarnJobFactory) will create tasks consume and
> > >> process
> > >> messages out of Kafka and the thread model used.
> > >>
> > >> I did a few tests with the ThreadJob factory consuming out of a kafka
> > >> topic with 5 partitions and I was expecting that it would use multiple
> > >> threads to consume/process the different partitions, however it is
> > >> using only one thread at runtime.
> > >>
> > >> Is there any way to tell Samza to use multiple processing threads (1
> per
> > >> partition)??
> > >>
> > >>
> > >> Thanks
> > >> Bruno
> > >>
> > >>
> > >
> >
>

Re: Runtime Execution Model

Posted by Hannes Stockner <ha...@gmail.com>.
+1


On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <br...@gmail.com>
wrote:

> Hi,
>
> I support what Lukas saying. Samza packaging requirements are not friendly,
> I use the ThreadJobFactory for the same reason.
>
> Bruno
>
> On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
> > Hi Yan,
> >
> > We use Samza in a production environment using ProcessJobFactory in
> Docker
> > containers because it greatly simplifies our deployment process and makes
> > much better use of resources.
> >
> > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > multithreaded? I will look into doing that myself, but I think it might
> be
> > useful to implement this for everyone. I am sure there are plenty of
> cases
> > where people do not want to use YARN, but want more parallelism in their
> > tasks.
> >
> > Lukas
> >
> > -----Original Message----- From: Yan Fang
> > Sent: Monday, September 14, 2015 11:08 AM
> > To: dev@samza.apache.org
> > Subject: Re: Runtime Execution Model
> >
> >
> > Hi Bruno,
> >
> > AFAIK, there is no existing JobFactory that brings as many threads as the
> > partition number. But I think nothing stops you to implement this: you
> can
> > get the partition information from the JobCoordinator, and then bring as
> > many threads as the partition/task number.
> >
> > Since the two local factories (ThreadJobFactory and ProcessJobFactory)
> are
> > mainly for development, there is no additional document. But most of the
> > code here
> > <
> >
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> > >
> > is
> > self-explained.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
> > wrote:
> >
> > Hi,
> >> I'm looking for additional documentation on the different RUNTIME
> >> EXECUTION MODELS of the different `job.factory.class`.
> >>
> >> I'm particularly interested on how each factory (ThreadJobFactory,
> >> ProcessJobFactory and YarnJobFactory) will create tasks consume and
> >> process
> >> messages out of Kafka and the thread model used.
> >>
> >> I did a few tests with the ThreadJob factory consuming out of a kafka
> >> topic with 5 partitions and I was expecting that it would use multiple
> >> threads to consume/process the different partitions, however it is
> >> using only one thread at runtime.
> >>
> >> Is there any way to tell Samza to use multiple processing threads (1 per
> >> partition)??
> >>
> >>
> >> Thanks
> >> Bruno
> >>
> >>
> >
>

Re: Runtime Execution Model

Posted by Bruno Bonacci <br...@gmail.com>.
Hi,

I support what Lukas saying. Samza packaging requirements are not friendly,
I use the ThreadJobFactory for the same reason.

Bruno

On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

> Hi Yan,
>
> We use Samza in a production environment using ProcessJobFactory in Docker
> containers because it greatly simplifies our deployment process and makes
> much better use of resources.
>
> Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> multithreaded? I will look into doing that myself, but I think it might be
> useful to implement this for everyone. I am sure there are plenty of cases
> where people do not want to use YARN, but want more parallelism in their
> tasks.
>
> Lukas
>
> -----Original Message----- From: Yan Fang
> Sent: Monday, September 14, 2015 11:08 AM
> To: dev@samza.apache.org
> Subject: Re: Runtime Execution Model
>
>
> Hi Bruno,
>
> AFAIK, there is no existing JobFactory that brings as many threads as the
> partition number. But I think nothing stops you to implement this: you can
> get the partition information from the JobCoordinator, and then bring as
> many threads as the partition/task number.
>
> Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
> mainly for development, there is no additional document. But most of the
> code here
> <
> https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local
> >
> is
> self-explained.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
> wrote:
>
> Hi,
>> I'm looking for additional documentation on the different RUNTIME
>> EXECUTION MODELS of the different `job.factory.class`.
>>
>> I'm particularly interested on how each factory (ThreadJobFactory,
>> ProcessJobFactory and YarnJobFactory) will create tasks consume and
>> process
>> messages out of Kafka and the thread model used.
>>
>> I did a few tests with the ThreadJob factory consuming out of a kafka
>> topic with 5 partitions and I was expecting that it would use multiple
>> threads to consume/process the different partitions, however it is
>> using only one thread at runtime.
>>
>> Is there any way to tell Samza to use multiple processing threads (1 per
>> partition)??
>>
>>
>> Thanks
>> Bruno
>>
>>
>

Re: Runtime Execution Model

Posted by Lukas Steiblys <lu...@doubledutch.me>.
Hi Yan,

We use Samza in a production environment using ProcessJobFactory in Docker 
containers because it greatly simplifies our deployment process and makes 
much better use of resources.

Is there any plan to make the ThreadJobFactory or ProcessJobFactory 
multithreaded? I will look into doing that myself, but I think it might be 
useful to implement this for everyone. I am sure there are plenty of cases 
where people do not want to use YARN, but want more parallelism in their 
tasks.

Lukas

-----Original Message----- 
From: Yan Fang
Sent: Monday, September 14, 2015 11:08 AM
To: dev@samza.apache.org
Subject: Re: Runtime Execution Model

Hi Bruno,

AFAIK, there is no existing JobFactory that brings as many threads as the
partition number. But I think nothing stops you to implement this: you can
get the partition information from the JobCoordinator, and then bring as
many threads as the partition/task number.

Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
mainly for development, there is no additional document. But most of the
code here
<https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local>
is
self-explained.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
wrote:

> Hi,
> I'm looking for additional documentation on the different RUNTIME
> EXECUTION MODELS of the different `job.factory.class`.
>
> I'm particularly interested on how each factory (ThreadJobFactory,
> ProcessJobFactory and YarnJobFactory) will create tasks consume and 
> process
> messages out of Kafka and the thread model used.
>
> I did a few tests with the ThreadJob factory consuming out of a kafka
> topic with 5 partitions and I was expecting that it would use multiple
> threads to consume/process the different partitions, however it is
> using only one thread at runtime.
>
> Is there any way to tell Samza to use multiple processing threads (1 per
> partition)??
>
>
> Thanks
> Bruno
> 


Re: Runtime Execution Model

Posted by Yan Fang <ya...@gmail.com>.
Hi Bruno,

AFAIK, there is no existing JobFactory that brings as many threads as the
partition number. But I think nothing stops you to implement this: you can
get the partition information from the JobCoordinator, and then bring as
many threads as the partition/task number.

Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
mainly for development, there is no additional document. But most of the
code here
<https://github.com/apache/samza/tree/master/samza-core/src/main/scala/org/apache/samza/job/local>
is
self-explained.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci <br...@gmail.com>
wrote:

> Hi,
> I'm looking for additional documentation on the different RUNTIME
> EXECUTION MODELS of the different `job.factory.class`.
>
> I'm particularly interested on how each factory (ThreadJobFactory,
> ProcessJobFactory and YarnJobFactory) will create tasks consume and process
> messages out of Kafka and the thread model used.
>
> I did a few tests with the ThreadJob factory consuming out of a kafka
> topic with 5 partitions and I was expecting that it would use multiple
> threads to consume/process the different partitions, however it is
> using only one thread at runtime.
>
> Is there any way to tell Samza to use multiple processing threads (1 per
> partition)??
>
>
> Thanks
> Bruno
>