You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by XiaoChuan Yu <xi...@kik.com> on 2017/08/09 17:07:53 UTC

Custom ordering when using async

Hi,

I have a few questions regarding the order of processing when using
processAsync.

From the LinkedIn article here
<https://engineering.linkedin.com/blog/2017/01/asynchronous-processing-and-multithreading-in-apache-samza--part>
it
mentions the following:
"For parallelism within a task, Samza guarantees processAsync will be
invoked in order for a task. The processing or completion, however, can go
out of order. With this guarantee, users can implement sub-task-level data
pipelining with customized ordering and parallelism. For example, users can
use a keyed single thread executor pool to have in-order processing per key
while processing messages with different keys in parallel."

1. What exactly is meant by a "keyed single thread executor pool"? Are
there any code examples available on what this looks like?
2. I need to process a stream keyed on user IDs in parallel using
processAsync but would like each user's event be processed in order. Does
this then require custom ordering logic mentioned in the article?

Thanks,
Xiaochuan Yu

Re: Custom ordering when using async

Posted by Yi Pan <ni...@gmail.com>.
Hi, Xiaochuan,

Please refer to the document here:
https://samza.apache.org/learn/tutorials/0.13/samza-async-user-guide.html

As stated, by default, when task.max.concurrency=1, in-order processing
within a task is guaranteed, no matter whether you implement StreamTask or
AsyncStreamTask. The engineering blog that you referred to at the beginning
is referring to AsyncStreamTask cases, where the completion of the pending
process() calls may need special treatment to ensure in-order completion
(note that in-order invocation of process() calls within a task is always
guaranteed). For your use case, I would suggest to:
- use StreamTask
- configure x number of threads in the thread pool
- set task.max.concurrency = 1

What you get here from the above configuration is:
- in-order process() invocation and completion within a task
- maximum of x tasks can be executed in parallel in a container

If that fits your performance requirement, it would be the simplest
configuration for your use case. Otherwise, let us know if you want to
explore options to maintain in-order process() completion with
task.max.concurrency > 1.

Best!

-Yi

On Fri, Aug 25, 2017 at 5:02 PM, XiaoChuan Yu <xi...@kik.com> wrote:

> Hi Jagadish,
>
> This is a rather late reply but I don't think I understand of the effect of
> changing job.container.thread.pool.size config in the synchronous case very
> well.
> Suppose I have an input topic partitioned by memberID processed by a
> synchronous job.
> Does increasing container thread pool size still guarantee that the
> messages will be processed serially for messages with the same key?
> For example, if I had 2 messages msg1 followed by msg2 with the same key
> then is it guaranteed that msg1 will finish processing before starting to
> process msg2?
>
> My actual situation is that we currently have a job that has maxed out the
> allowed number of containers but is still not processing messages fast
> enough.
> We want to speedup processing but we would like to avoid increasing the
> number of partitions of the input topic.
> The job is IO bound and has the requirement that messages with the same key
> are processed serially.
>
> Thanks,
> Xiaochuan Yu
>
> On Thu, Aug 10, 2017 at 11:28 PM Jagadish Venkatraman <
> jagadish1989@gmail.com> wrote:
>
> > Hi XiaoChuan,
> >
> > Are you setting task.max.concurrency > 1 that allows multiple messages
> > in-flight? (The "keyed executor pool" is only meaningful with that
> > scenario)
> >
> > Also, Have you tried increasing your *job.container.thread.pool.size
> > *config
> > and setting it to the number of tasks in the container? Given that your
> > input topic is already partitioned by memberID, it'll probably be simpler
> > to try this first, benchmark your QPS and see if it meets your
> performance
> > goals. I'd tune these config-knobs first and confirm that you need the
> > "keyed executor thread pool". You may find that it introduces more
> > complexity.
> >
> > Please let us know if you had further questions. We are happy to further
> > help you tune your job for maximum performance.
> >
> >
> >
> > On Wed, Aug 9, 2017 at 4:03 PM, xinyu liu <xi...@gmail.com> wrote:
> >
> > > Hi, XiaoChuan,
> > >
> > > For your questions:
> > >
> > > 1. By "keyed single thread executor pool", it means something like a
> map
> > > from a key to a single thread executor, like Map<String, Executor>
> where
> > > each Executor is a Executors.*newSingleThreadExecutor
> > > <https://docs.oracle.com/javase/7/docs/api/java/util/
> > > concurrent/Executors.html#newSingleThreadExecutor()>*
> > > (). This means for a particular key, it will be executed in a
> designated
> > > thread, which guarantees the ordering of the key.
> > >
> > > 2. For your use case, you can create the above keyed executors by
> setting
> > > the key being some hash of the user id. For example:
> > >
> > > Map<Integer, Executor> keyedExecutors = new HashMap<>();
> > >
> > > in processAsync():
> > > String memberId = ....
> > > int hash = memberId.hashCode(); // you can reduce the hash size by %
> > > Executor executor = keyedExecutors.get(hash);
> > > if (executor == null) {
> > >   executor = Executors.newSingleThreadExecutor();
> > >   keyedExecutors.put(hash, executor);
> > > }
> > >
> > > executor.execute(() -> process your message here);
> > > ...
> > >
> > > So the same user will always be executed in a single thread, which
> > ensures
> > > the ordering. Does this make sense to you?
> > >
> > > Thanks,
> > > Xinyu
> > >
> > >
> > >
> > > On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu <xi...@kik.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a few questions regarding the order of processing when using
> > > > processAsync.
> > > >
> > > > From the LinkedIn article here
> > > > <https://engineering.linkedin.com/blog/2017/01/asynchronous-
> > > > processing-and-multithreading-in-apache-samza--part>
> > > > it
> > > > mentions the following:
> > > > "For parallelism within a task, Samza guarantees processAsync will be
> > > > invoked in order for a task. The processing or completion, however,
> can
> > > go
> > > > out of order. With this guarantee, users can implement sub-task-level
> > > data
> > > > pipelining with customized ordering and parallelism. For example,
> users
> > > can
> > > > use a keyed single thread executor pool to have in-order processing
> per
> > > key
> > > > while processing messages with different keys in parallel."
> > > >
> > > > 1. What exactly is meant by a "keyed single thread executor pool"?
> Are
> > > > there any code examples available on what this looks like?
> > > > 2. I need to process a stream keyed on user IDs in parallel using
> > > > processAsync but would like each user's event be processed in order.
> > Does
> > > > this then require custom ordering logic mentioned in the article?
> > > >
> > > > Thanks,
> > > > Xiaochuan Yu
> > > >
> > >
> >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
>

Re: Custom ordering when using async

Posted by XiaoChuan Yu <xi...@kik.com>.
Hi Jagadish,

This is a rather late reply but I don't think I understand of the effect of
changing job.container.thread.pool.size config in the synchronous case very
well.
Suppose I have an input topic partitioned by memberID processed by a
synchronous job.
Does increasing container thread pool size still guarantee that the
messages will be processed serially for messages with the same key?
For example, if I had 2 messages msg1 followed by msg2 with the same key
then is it guaranteed that msg1 will finish processing before starting to
process msg2?

My actual situation is that we currently have a job that has maxed out the
allowed number of containers but is still not processing messages fast
enough.
We want to speedup processing but we would like to avoid increasing the
number of partitions of the input topic.
The job is IO bound and has the requirement that messages with the same key
are processed serially.

Thanks,
Xiaochuan Yu

On Thu, Aug 10, 2017 at 11:28 PM Jagadish Venkatraman <
jagadish1989@gmail.com> wrote:

> Hi XiaoChuan,
>
> Are you setting task.max.concurrency > 1 that allows multiple messages
> in-flight? (The "keyed executor pool" is only meaningful with that
> scenario)
>
> Also, Have you tried increasing your *job.container.thread.pool.size
> *config
> and setting it to the number of tasks in the container? Given that your
> input topic is already partitioned by memberID, it'll probably be simpler
> to try this first, benchmark your QPS and see if it meets your performance
> goals. I'd tune these config-knobs first and confirm that you need the
> "keyed executor thread pool". You may find that it introduces more
> complexity.
>
> Please let us know if you had further questions. We are happy to further
> help you tune your job for maximum performance.
>
>
>
> On Wed, Aug 9, 2017 at 4:03 PM, xinyu liu <xi...@gmail.com> wrote:
>
> > Hi, XiaoChuan,
> >
> > For your questions:
> >
> > 1. By "keyed single thread executor pool", it means something like a map
> > from a key to a single thread executor, like Map<String, Executor>  where
> > each Executor is a Executors.*newSingleThreadExecutor
> > <https://docs.oracle.com/javase/7/docs/api/java/util/
> > concurrent/Executors.html#newSingleThreadExecutor()>*
> > (). This means for a particular key, it will be executed in a designated
> > thread, which guarantees the ordering of the key.
> >
> > 2. For your use case, you can create the above keyed executors by setting
> > the key being some hash of the user id. For example:
> >
> > Map<Integer, Executor> keyedExecutors = new HashMap<>();
> >
> > in processAsync():
> > String memberId = ....
> > int hash = memberId.hashCode(); // you can reduce the hash size by %
> > Executor executor = keyedExecutors.get(hash);
> > if (executor == null) {
> >   executor = Executors.newSingleThreadExecutor();
> >   keyedExecutors.put(hash, executor);
> > }
> >
> > executor.execute(() -> process your message here);
> > ...
> >
> > So the same user will always be executed in a single thread, which
> ensures
> > the ordering. Does this make sense to you?
> >
> > Thanks,
> > Xinyu
> >
> >
> >
> > On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu <xi...@kik.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have a few questions regarding the order of processing when using
> > > processAsync.
> > >
> > > From the LinkedIn article here
> > > <https://engineering.linkedin.com/blog/2017/01/asynchronous-
> > > processing-and-multithreading-in-apache-samza--part>
> > > it
> > > mentions the following:
> > > "For parallelism within a task, Samza guarantees processAsync will be
> > > invoked in order for a task. The processing or completion, however, can
> > go
> > > out of order. With this guarantee, users can implement sub-task-level
> > data
> > > pipelining with customized ordering and parallelism. For example, users
> > can
> > > use a keyed single thread executor pool to have in-order processing per
> > key
> > > while processing messages with different keys in parallel."
> > >
> > > 1. What exactly is meant by a "keyed single thread executor pool"? Are
> > > there any code examples available on what this looks like?
> > > 2. I need to process a stream keyed on user IDs in parallel using
> > > processAsync but would like each user's event be processed in order.
> Does
> > > this then require custom ordering logic mentioned in the article?
> > >
> > > Thanks,
> > > Xiaochuan Yu
> > >
> >
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>

Re: Custom ordering when using async

Posted by Jagadish Venkatraman <ja...@gmail.com>.
Hi XiaoChuan,

Are you setting task.max.concurrency > 1 that allows multiple messages
in-flight? (The "keyed executor pool" is only meaningful with that
scenario)

Also, Have you tried increasing your *job.container.thread.pool.size *config
and setting it to the number of tasks in the container? Given that your
input topic is already partitioned by memberID, it'll probably be simpler
to try this first, benchmark your QPS and see if it meets your performance
goals. I'd tune these config-knobs first and confirm that you need the
"keyed executor thread pool". You may find that it introduces more
complexity.

Please let us know if you had further questions. We are happy to further
help you tune your job for maximum performance.



On Wed, Aug 9, 2017 at 4:03 PM, xinyu liu <xi...@gmail.com> wrote:

> Hi, XiaoChuan,
>
> For your questions:
>
> 1. By "keyed single thread executor pool", it means something like a map
> from a key to a single thread executor, like Map<String, Executor>  where
> each Executor is a Executors.*newSingleThreadExecutor
> <https://docs.oracle.com/javase/7/docs/api/java/util/
> concurrent/Executors.html#newSingleThreadExecutor()>*
> (). This means for a particular key, it will be executed in a designated
> thread, which guarantees the ordering of the key.
>
> 2. For your use case, you can create the above keyed executors by setting
> the key being some hash of the user id. For example:
>
> Map<Integer, Executor> keyedExecutors = new HashMap<>();
>
> in processAsync():
> String memberId = ....
> int hash = memberId.hashCode(); // you can reduce the hash size by %
> Executor executor = keyedExecutors.get(hash);
> if (executor == null) {
>   executor = Executors.newSingleThreadExecutor();
>   keyedExecutors.put(hash, executor);
> }
>
> executor.execute(() -> process your message here);
> ...
>
> So the same user will always be executed in a single thread, which ensures
> the ordering. Does this make sense to you?
>
> Thanks,
> Xinyu
>
>
>
> On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu <xi...@kik.com>
> wrote:
>
> > Hi,
> >
> > I have a few questions regarding the order of processing when using
> > processAsync.
> >
> > From the LinkedIn article here
> > <https://engineering.linkedin.com/blog/2017/01/asynchronous-
> > processing-and-multithreading-in-apache-samza--part>
> > it
> > mentions the following:
> > "For parallelism within a task, Samza guarantees processAsync will be
> > invoked in order for a task. The processing or completion, however, can
> go
> > out of order. With this guarantee, users can implement sub-task-level
> data
> > pipelining with customized ordering and parallelism. For example, users
> can
> > use a keyed single thread executor pool to have in-order processing per
> key
> > while processing messages with different keys in parallel."
> >
> > 1. What exactly is meant by a "keyed single thread executor pool"? Are
> > there any code examples available on what this looks like?
> > 2. I need to process a stream keyed on user IDs in parallel using
> > processAsync but would like each user's event be processed in order. Does
> > this then require custom ordering logic mentioned in the article?
> >
> > Thanks,
> > Xiaochuan Yu
> >
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Re: Custom ordering when using async

Posted by xinyu liu <xi...@gmail.com>.
Hi, XiaoChuan,

For your questions:

1. By "keyed single thread executor pool", it means something like a map
from a key to a single thread executor, like Map<String, Executor>  where
each Executor is a Executors.*newSingleThreadExecutor
<https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()>*
(). This means for a particular key, it will be executed in a designated
thread, which guarantees the ordering of the key.

2. For your use case, you can create the above keyed executors by setting
the key being some hash of the user id. For example:

Map<Integer, Executor> keyedExecutors = new HashMap<>();

in processAsync():
String memberId = ....
int hash = memberId.hashCode(); // you can reduce the hash size by %
Executor executor = keyedExecutors.get(hash);
if (executor == null) {
  executor = Executors.newSingleThreadExecutor();
  keyedExecutors.put(hash, executor);
}

executor.execute(() -> process your message here);
...

So the same user will always be executed in a single thread, which ensures
the ordering. Does this make sense to you?

Thanks,
Xinyu



On Wed, Aug 9, 2017 at 10:07 AM, XiaoChuan Yu <xi...@kik.com> wrote:

> Hi,
>
> I have a few questions regarding the order of processing when using
> processAsync.
>
> From the LinkedIn article here
> <https://engineering.linkedin.com/blog/2017/01/asynchronous-
> processing-and-multithreading-in-apache-samza--part>
> it
> mentions the following:
> "For parallelism within a task, Samza guarantees processAsync will be
> invoked in order for a task. The processing or completion, however, can go
> out of order. With this guarantee, users can implement sub-task-level data
> pipelining with customized ordering and parallelism. For example, users can
> use a keyed single thread executor pool to have in-order processing per key
> while processing messages with different keys in parallel."
>
> 1. What exactly is meant by a "keyed single thread executor pool"? Are
> there any code examples available on what this looks like?
> 2. I need to process a stream keyed on user IDs in parallel using
> processAsync but would like each user's event be processed in order. Does
> this then require custom ordering logic mentioned in the article?
>
> Thanks,
> Xiaochuan Yu
>