You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jordan Shaw <jo...@pubnub.com> on 2015/02/06 09:00:53 UTC

container concurrency and pipelining

Hi everyone,
I've done some raw Disk, Kafka and Samza benchmarking. I peaked out a
single Samza container's consumer at around 2MB/s. Running a Kafka Consumer
Perf test though on the same machine I can do 100's of MB/s. It seems like
most of the bottleneck exists in the Kafka async client. There appears to
be only 1 thread in the Kafka client rather than a thread pool and due to
the limitation that a container can't run on multiple cores this thread
gets scheduled I assume on the same core as the consumer and process call.

I know a lot thought has been put into the design of maintaining parity
between task instances and partitions and preventing unpredictable behavior
from a threaded system. A reasonable solution might be to just add
partitions and increase container count with the partition count. This is
at the cost of increasing memory usage on the node managers necessarily due
to the increased container count.

Has there been any design discussions into allowing multiple cores on on a
single container to allow better pipelining within the container to get
better throughput and also introducing a thread pool outside of Kafka's
client to allow concurrent produces to Kafka within the same container? I
understand there are ordering concerns with this concurrency and for those
sensitive use cases the thread pool could be 1 but for use cases where
ordering is less important and raw throughput is more of a concern they can
achieve that with allowing current async produces. I also know that Kafka
has plans to rework their producer but I haven't been able to find if this
includes introducing a thread pool to allow multiple async produces.
Lastly, has anyone been able to get more MB/s out of a container than what
I have? Thanks!

-- 
Jordan

Re: container concurrency and pipelining

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jordan,

That's awesome! Yes, 9mb/s should be do-able, so I'm glad to hear that
worked. :)

In practice, what we've found is that serde does take most of the time.
Protobuf, Thrift, or Avro are usually the ones people end up using when
they care about performance at the level you're talking about. You should
also examine compression to determine whether snappy is fast enough.

> If I can get some time I can try and tackle:
https://issues.apache.org/jira/browse/SAMZA-6 following your
recommendations to get some more formal results. Thanks!

Cool, don't sweat it if you can't. I think SAMZA-548 is a pretty good
start. I noted some deficiencies in the test, which Jay also mentioned, but
if we fix those up, I think that test should be quite useful for running
perf tests on Samza containers. SAMZA-6 is a pretty big ticket, but if you
want to fiddle around with SAMZA-548, that'd be cool. :)

Cheers,
Chris

On Tue, Feb 10, 2015 at 2:43 PM, Jordan Shaw <jo...@pubnub.com> wrote:

> Hey Chris,
> Good News! ...sorta. We found that our serialization serde (msgpack) was
> taking 4x our process time, and when I changed the serde to String it
> supported our test traffic rates of 9MB/s without any signs of not being
> able to support more see here:[image: Inline image 1]
>
> We also benchmarked json and some other ones and haven't found anything
> fast enough yet. So we're going to do some research on this and see if we
> can find a fast serializer for us, might go down the PB, Thrift or Flat
> Buffer route. FWIW just upgrading to kafka 0.8.2 and Samza 0.9-SNAPSHOT I
> saw a increase in maybe 500KB/s and that's without changing any of the
> tuning on the producer or consumer. Here's a SS of that graph:
> [image: Inline image 3]
>
> If I can get some time I can try and tackle:
> https://issues.apache.org/jira/browse/SAMZA-6 following your
> recommendations to get some more formal results. Thanks!
> -Jordan
>
>
> On Tue, Feb 10, 2015 at 10:27 AM, Jordan Shaw <jo...@pubnub.com> wrote:
>
>> Hey Chris,
>> We've done pretty extensive testing already on that task. Here's a SS of
>> a sample of those results showing the 2MB/s rate. I haven't done those
>> profiling specifically, we were running htop and a network profiler to get
>> a general idea of system consumption. We'll add that to our todo's for
>> testing.
>>
>> [image: Inline image 1]
>>
>> Yesterday I was trying to get get your zopkio task to run on our cluster
>> and see if I get better through put. I almost got there but the
>> zopkio(python) kafka client wasn't connecting to my kafka cluster so
>> working on resolving those kind of issues and hopefully get that done
>> today. This is the error I was getting from zopkio:
>>
>> [Errno 111] Connection refused
>>
>> Other messages:
>>
>> Traceback (most recent call last):
>>   File
>> "/tmp/samza-tests/samza-integration-tests/local/lib/python2.7/site-packages/zopkio/test_runner.py",
>> line 323, in _execute_single_test
>>     test.function()
>>   File "/tmp/samza-tests/scripts/tests/performance_tests.py", line 38, in
>> test_kafka_read_write_performance
>>     _load_data()
>>   File "/tmp/samza-tests/scripts/tests/performance_tests.py", line 67, in
>> _load_data
>>     kafka = util.get_kafka_client()
>>   File "/tmp/samza-tests/scripts/tests/util.py", line 60, in
>> get_kafka_client
>>     if not wait_for_server(kafka_hosts[0], kafka_port, 30):
>>   File "/tmp/samza-tests/scripts/tests/util.py", line 73, in
>> wait_for_server
>>     s.connect((host, port))
>>   File "/usr/lib/python2.7/socket.py", line 224, in meth
>>     return getattr(self._sock,name)(*args)
>> error: [Errno 111] Connection refused
>>
>> Thanks!
>>
>> -Jordan
>>
>> On Tue, Feb 10, 2015 at 9:02 AM, Chris Riccomini <cr...@apache.org>
>> wrote:
>>
>>> Hey Jordan,
>>>
>>> It looks like your task is almost identical to the one in SAMZA-548. Did
>>> you have a chance to test your job out with 0.9?
>>>
>>> >  If I just consume off the envelope I was seeing much faster consume
>>> rates. Which was one of the indications that the producer was causing
>>> problems.
>>>
>>> Yes, this sounds believable. Did you attach visual VM, and do CPU
>>> sampling?
>>> It'd be good to get a view of exactly where in the "produce" call things
>>> are slow.
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Sun, Feb 8, 2015 at 9:47 PM, Jordan Shaw <jo...@pubnub.com> wrote:
>>>
>>> > Hey Chris,
>>> > Sorry for the delayed response, did a Tahoe 3 day weekend.
>>> >
>>> > Could you post your configs, and version of Samza that you're running?
>>> > https://gist.github.com/jshaw86/02dbca21ae32d1a9a24e. We were running
>>> 0.8
>>> > Samza the latest stable release. We upgraded to the 0.9 branch on
>>> Friday
>>> > and Kafka as well so we'll go over that starting tomorrow.
>>> >
>>> > How many threads were you running? Can you describe (or post) the two
>>> tests
>>> > that you did?
>>> > We ran a few different thread combinations of kafka-consumer-perf,sh
>>> and
>>> > kafka-producer-perf.sh, 1 or 3 Producer Threads, 1 or N Consumer
>>> Threads
>>> > where N = Partition Count. We only wen't up to 2 partitions though. We
>>> just
>>> > ran the consumer and producer perf tests individually( not
>>> concurrently )
>>> > here are the results:
>>> https://gist.github.com/jshaw86/0bdd4d5bb1e233cd0b3f
>>> >
>>> > Here is task I was using to do the Samza perf of the consumer and
>>> producer:
>>> > https://gist.github.com/jshaw86/9c09a16112eee440f681. It's pretty
>>> basic
>>> > the
>>> > idea is just get a message off the envelope and send it back out as
>>> fast as
>>> > possible and that's where I was seeing the 2MB/s. If I just consume
>>> off the
>>> > envelope I was seeing much faster consume rates. Which was one of the
>>> > indications that the producer was causing problems.
>>> >
>>> > Thanks a lot for your perf tests I'll review it tomorrow against my
>>> configs
>>> > and see if I can come up with what I'm doing wrong. I also submitted
>>> the
>>> > JIRA per your request. Cheers.
>>> >
>>> >
>>> > On Sun, Feb 8, 2015 at 8:21 PM, Chris Riccomini <criccomini@apache.org
>>> >
>>> > wrote:
>>> >
>>> > > Hey Jordan,
>>> > >
>>> > > I've put up a perf test on:
>>> > >
>>> > >   https://issues.apache.org/jira/browse/SAMZA-548
>>> > >
>>> > > The JIRA describes the test implementation, observed performance, and
>>> > noted
>>> > > deficiencies in the test. I'm getting much more than 2mb/s.
>>> > >
>>> > > Cheers,
>>> > > Chris
>>> > >
>>> > > On Fri, Feb 6, 2015 at 8:34 AM, Chris Riccomini <
>>> criccomini@apache.org>
>>> > > wrote:
>>> > >
>>> > > > Hey Jordan,
>>> > > >
>>> > > > > I peaked out a single Samza container's consumer at around 2MB/s.
>>> > > >
>>> > > > Could you post your configs, and version of Samza that you're
>>> running?
>>> > > >
>>> > > > > Running a Kafka Consumer Perf test though on the same machine I
>>> can
>>> > do
>>> > > > 100's of MB/s.
>>> > > >
>>> > > > How many threads were you running? Also, you're saying "consumer
>>> perf"
>>> > > > here. Consumer and producer exhibit very different throughput
>>> > > > characteristics. Can you describe (or post) the two tests that you
>>> did?
>>> > > >
>>> > > > > It seems like most of the bottleneck exists in the Kafka async
>>> > client.
>>> > > >
>>> > > > Yes, this is what we've observed as well.
>>> > > >
>>> > > > > A reasonable solution might be to just add partitions and
>>> increase
>>> > > > container count with the partition count.
>>> > > >
>>> > > > This is usually the guidance that we give. If you have 8 cores, and
>>> > want
>>> > > > to max out your machine, you should run 8 containers.
>>> > > >
>>> > > > > Has there been any design discussions into allowing multiple
>>> cores on
>>> > > > on a single container to allow better pipelining within the
>>> container?
>>> > > >
>>> > > > The discussion pretty much is what you've just described. We never
>>> felt
>>> > > > that the increase in code complexity, configs, mental model was
>>> worth
>>> > the
>>> > > > trade-off. My argument is that we should make the Kafka producer go
>>> > > faster
>>> > > > (see comments below), rather than increasing complexity in Samza
>>> to get
>>> > > > around it.
>>> > > >
>>> > > > > I also know that Kafka has plans to rework their producer but I
>>> > haven't
>>> > > > been able to find if this includes introducing a thread pool to
>>> allow
>>> > > > multiple async produces.
>>> > > >
>>> > > > We have upgraded Samza to the new producer in SAMZA-227. The code
>>> > changes
>>> > > > are on master now. You should definitely check that out.
>>> > > >
>>> > > > The new Kafka producer works as follows: there is one "sender"
>>> thread.
>>> > > > When you send messages, the messages get queued up, and the sender
>>> > thread
>>> > > > takes them off the queue, and sends them to Kafka. One trick with
>>> the
>>> > new
>>> > > > producer is that they are using NIO, and allow for pipelining.
>>> This is
>>> > > > *specifically* to address the point you made about those that care
>>> more
>>> > > > about throughput than ordering guarantees. The config of interest
>>> to
>>> > you
>>> > > is:
>>> > > >
>>> > > >   max.in.flight.requests.per.connection
>>> > > >
>>> > > > This defines how many parallel sends can be pipelined (over one
>>> socket,
>>> > > in
>>> > > > the sender thread) before the send thread blocks. Samza forces
>>> this to
>>> > 1
>>> > > > right now (because we wanted to guarantee ordering). It seems like
>>> a
>>> > > > reasonable request to allow users to over-ride this with their own
>>> > > setting
>>> > > > if they want more parallelism. Could you open a JIRA for that?
>>> > > >
>>> > > > I should note, in smoke tests, with max-in-flight set to one in
>>> Samza,
>>> > > the
>>> > > > perf seemed roughly on-par with the Samza running the old Kafka
>>> > > producer. I
>>> > > > also spoke to Jay at the last Kafka meetup, and he mentioned that
>>> they
>>> > > > don't see much of a performance boost when running max-in-flight >
>>> 1.
>>> > Jun
>>> > > > did some perf comparison between the old and new Kafka producer,
>>> and
>>> > put
>>> > > > the information on some slides that he presented at the meetup. If
>>> > you're
>>> > > > interested, you should ping them on the Kafka mailing list.
>>> > > >
>>> > > > > Lastly, has anyone been able to get more MB/s out of a container
>>> than
>>> > > > what I have?
>>> > > >
>>> > > > Thus far, I (personally) haven't spent much time on producer-side
>>> > > > optimization, so I don't have hard numbers on it. Our producer
>>> code is
>>> > > > pretty thin, so we're pretty much bound to what the Kafka producer
>>> can
>>> > > > do.If you're up for it, you might want to contribute something to:
>>> > > >
>>> > > >   https://issues.apache.org/jira/browse/SAMZA-6
>>> > > >
>>> > > > Here's what I'd recommend:
>>> > > >
>>> > > > 0. Write something reproducible and post it on SAMZA-6. For bonus
>>> > points,
>>> > > > write an equivalent raw-Kafka-producer test (no Samza) so we can
>>> > compare
>>> > > > them.
>>> > > > 1. Checkout master.
>>> > > > 2. Modify master to allow you to configure max-in-flights > 1
>>> (line 185
>>> > > of
>>> > > > KafkaConfig.scala).
>>> > > > 3. Try setting acks to 0 (it's 1 by default).
>>> > > >
>>> > > > Try running your tests at every one of these steps, and see how it
>>> > > affects
>>> > > > performance. If you get to 3, and things are still slow, we can
>>> loop in
>>> > > > some Kakfa-dev folks.
>>> > > >
>>> > > > Cheers,
>>> > > > Chris
>>> > > >
>>> > > > On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com>
>>> > wrote:
>>> > > >
>>> > > >> Hi everyone,
>>> > > >> I've done some raw Disk, Kafka and Samza benchmarking. I peaked
>>> out a
>>> > > >> single Samza container's consumer at around 2MB/s. Running a Kafka
>>> > > >> Consumer
>>> > > >> Perf test though on the same machine I can do 100's of MB/s. It
>>> seems
>>> > > like
>>> > > >> most of the bottleneck exists in the Kafka async client. There
>>> appears
>>> > > to
>>> > > >> be only 1 thread in the Kafka client rather than a thread pool
>>> and due
>>> > > to
>>> > > >> the limitation that a container can't run on multiple cores this
>>> > thread
>>> > > >> gets scheduled I assume on the same core as the consumer and
>>> process
>>> > > call.
>>> > > >>
>>> > > >> I know a lot thought has been put into the design of maintaining
>>> > parity
>>> > > >> between task instances and partitions and preventing unpredictable
>>> > > >> behavior
>>> > > >> from a threaded system. A reasonable solution might be to just add
>>> > > >> partitions and increase container count with the partition count.
>>> This
>>> > > is
>>> > > >> at the cost of increasing memory usage on the node managers
>>> > necessarily
>>> > > >> due
>>> > > >> to the increased container count.
>>> > > >>
>>> > > >> Has there been any design discussions into allowing multiple
>>> cores on
>>> > > on a
>>> > > >> single container to allow better pipelining within the container
>>> to
>>> > get
>>> > > >> better throughput and also introducing a thread pool outside of
>>> > Kafka's
>>> > > >> client to allow concurrent produces to Kafka within the same
>>> > container?
>>> > > I
>>> > > >> understand there are ordering concerns with this concurrency and
>>> for
>>> > > those
>>> > > >> sensitive use cases the thread pool could be 1 but for use cases
>>> where
>>> > > >> ordering is less important and raw throughput is more of a concern
>>> > they
>>> > > >> can
>>> > > >> achieve that with allowing current async produces. I also know
>>> that
>>> > > Kafka
>>> > > >> has plans to rework their producer but I haven't been able to
>>> find if
>>> > > this
>>> > > >> includes introducing a thread pool to allow multiple async
>>> produces.
>>> > > >> Lastly, has anyone been able to get more MB/s out of a container
>>> than
>>> > > what
>>> > > >> I have? Thanks!
>>> > > >>
>>> > > >> --
>>> > > >> Jordan
>>> > > >>
>>> > > >
>>> > > >
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > Jordan Shaw
>>> > Full Stack Software Engineer
>>> > PubNub Inc
>>> > 1045 17th St
>>> > San Francisco, CA 94107
>>> >
>>>
>>
>>
>>
>> --
>> Jordan Shaw
>> Full Stack Software Engineer
>> PubNub Inc
>> 1045 17th St
>> San Francisco, CA 94107
>>
>
>
>
> --
> Jordan Shaw
> Full Stack Software Engineer
> PubNub Inc
> 1045 17th St
> San Francisco, CA 94107
>

Re: container concurrency and pipelining

Posted by Jordan Shaw <jo...@pubnub.com>.
Hey Chris,
Good News! ...sorta. We found that our serialization serde (msgpack) was
taking 4x our process time, and when I changed the serde to String it
supported our test traffic rates of 9MB/s without any signs of not being
able to support more see here:[image: Inline image 1]

We also benchmarked json and some other ones and haven't found anything
fast enough yet. So we're going to do some research on this and see if we
can find a fast serializer for us, might go down the PB, Thrift or Flat
Buffer route. FWIW just upgrading to kafka 0.8.2 and Samza 0.9-SNAPSHOT I
saw a increase in maybe 500KB/s and that's without changing any of the
tuning on the producer or consumer. Here's a SS of that graph:
[image: Inline image 3]

If I can get some time I can try and tackle:
https://issues.apache.org/jira/browse/SAMZA-6 following your
recommendations to get some more formal results. Thanks!
-Jordan


On Tue, Feb 10, 2015 at 10:27 AM, Jordan Shaw <jo...@pubnub.com> wrote:

> Hey Chris,
> We've done pretty extensive testing already on that task. Here's a SS of a
> sample of those results showing the 2MB/s rate. I haven't done those
> profiling specifically, we were running htop and a network profiler to get
> a general idea of system consumption. We'll add that to our todo's for
> testing.
>
> [image: Inline image 1]
>
> Yesterday I was trying to get get your zopkio task to run on our cluster
> and see if I get better through put. I almost got there but the
> zopkio(python) kafka client wasn't connecting to my kafka cluster so
> working on resolving those kind of issues and hopefully get that done
> today. This is the error I was getting from zopkio:
>
> [Errno 111] Connection refused
>
> Other messages:
>
> Traceback (most recent call last):
>   File
> "/tmp/samza-tests/samza-integration-tests/local/lib/python2.7/site-packages/zopkio/test_runner.py",
> line 323, in _execute_single_test
>     test.function()
>   File "/tmp/samza-tests/scripts/tests/performance_tests.py", line 38, in
> test_kafka_read_write_performance
>     _load_data()
>   File "/tmp/samza-tests/scripts/tests/performance_tests.py", line 67, in
> _load_data
>     kafka = util.get_kafka_client()
>   File "/tmp/samza-tests/scripts/tests/util.py", line 60, in
> get_kafka_client
>     if not wait_for_server(kafka_hosts[0], kafka_port, 30):
>   File "/tmp/samza-tests/scripts/tests/util.py", line 73, in
> wait_for_server
>     s.connect((host, port))
>   File "/usr/lib/python2.7/socket.py", line 224, in meth
>     return getattr(self._sock,name)(*args)
> error: [Errno 111] Connection refused
>
> Thanks!
>
> -Jordan
>
> On Tue, Feb 10, 2015 at 9:02 AM, Chris Riccomini <cr...@apache.org>
> wrote:
>
>> Hey Jordan,
>>
>> It looks like your task is almost identical to the one in SAMZA-548. Did
>> you have a chance to test your job out with 0.9?
>>
>> >  If I just consume off the envelope I was seeing much faster consume
>> rates. Which was one of the indications that the producer was causing
>> problems.
>>
>> Yes, this sounds believable. Did you attach visual VM, and do CPU
>> sampling?
>> It'd be good to get a view of exactly where in the "produce" call things
>> are slow.
>>
>> Cheers,
>> Chris
>>
>> On Sun, Feb 8, 2015 at 9:47 PM, Jordan Shaw <jo...@pubnub.com> wrote:
>>
>> > Hey Chris,
>> > Sorry for the delayed response, did a Tahoe 3 day weekend.
>> >
>> > Could you post your configs, and version of Samza that you're running?
>> > https://gist.github.com/jshaw86/02dbca21ae32d1a9a24e. We were running
>> 0.8
>> > Samza the latest stable release. We upgraded to the 0.9 branch on Friday
>> > and Kafka as well so we'll go over that starting tomorrow.
>> >
>> > How many threads were you running? Can you describe (or post) the two
>> tests
>> > that you did?
>> > We ran a few different thread combinations of kafka-consumer-perf,sh and
>> > kafka-producer-perf.sh, 1 or 3 Producer Threads, 1 or N Consumer Threads
>> > where N = Partition Count. We only wen't up to 2 partitions though. We
>> just
>> > ran the consumer and producer perf tests individually( not concurrently
>> )
>> > here are the results:
>> https://gist.github.com/jshaw86/0bdd4d5bb1e233cd0b3f
>> >
>> > Here is task I was using to do the Samza perf of the consumer and
>> producer:
>> > https://gist.github.com/jshaw86/9c09a16112eee440f681. It's pretty basic
>> > the
>> > idea is just get a message off the envelope and send it back out as
>> fast as
>> > possible and that's where I was seeing the 2MB/s. If I just consume off
>> the
>> > envelope I was seeing much faster consume rates. Which was one of the
>> > indications that the producer was causing problems.
>> >
>> > Thanks a lot for your perf tests I'll review it tomorrow against my
>> configs
>> > and see if I can come up with what I'm doing wrong. I also submitted the
>> > JIRA per your request. Cheers.
>> >
>> >
>> > On Sun, Feb 8, 2015 at 8:21 PM, Chris Riccomini <cr...@apache.org>
>> > wrote:
>> >
>> > > Hey Jordan,
>> > >
>> > > I've put up a perf test on:
>> > >
>> > >   https://issues.apache.org/jira/browse/SAMZA-548
>> > >
>> > > The JIRA describes the test implementation, observed performance, and
>> > noted
>> > > deficiencies in the test. I'm getting much more than 2mb/s.
>> > >
>> > > Cheers,
>> > > Chris
>> > >
>> > > On Fri, Feb 6, 2015 at 8:34 AM, Chris Riccomini <
>> criccomini@apache.org>
>> > > wrote:
>> > >
>> > > > Hey Jordan,
>> > > >
>> > > > > I peaked out a single Samza container's consumer at around 2MB/s.
>> > > >
>> > > > Could you post your configs, and version of Samza that you're
>> running?
>> > > >
>> > > > > Running a Kafka Consumer Perf test though on the same machine I
>> can
>> > do
>> > > > 100's of MB/s.
>> > > >
>> > > > How many threads were you running? Also, you're saying "consumer
>> perf"
>> > > > here. Consumer and producer exhibit very different throughput
>> > > > characteristics. Can you describe (or post) the two tests that you
>> did?
>> > > >
>> > > > > It seems like most of the bottleneck exists in the Kafka async
>> > client.
>> > > >
>> > > > Yes, this is what we've observed as well.
>> > > >
>> > > > > A reasonable solution might be to just add partitions and increase
>> > > > container count with the partition count.
>> > > >
>> > > > This is usually the guidance that we give. If you have 8 cores, and
>> > want
>> > > > to max out your machine, you should run 8 containers.
>> > > >
>> > > > > Has there been any design discussions into allowing multiple
>> cores on
>> > > > on a single container to allow better pipelining within the
>> container?
>> > > >
>> > > > The discussion pretty much is what you've just described. We never
>> felt
>> > > > that the increase in code complexity, configs, mental model was
>> worth
>> > the
>> > > > trade-off. My argument is that we should make the Kafka producer go
>> > > faster
>> > > > (see comments below), rather than increasing complexity in Samza to
>> get
>> > > > around it.
>> > > >
>> > > > > I also know that Kafka has plans to rework their producer but I
>> > haven't
>> > > > been able to find if this includes introducing a thread pool to
>> allow
>> > > > multiple async produces.
>> > > >
>> > > > We have upgraded Samza to the new producer in SAMZA-227. The code
>> > changes
>> > > > are on master now. You should definitely check that out.
>> > > >
>> > > > The new Kafka producer works as follows: there is one "sender"
>> thread.
>> > > > When you send messages, the messages get queued up, and the sender
>> > thread
>> > > > takes them off the queue, and sends them to Kafka. One trick with
>> the
>> > new
>> > > > producer is that they are using NIO, and allow for pipelining. This
>> is
>> > > > *specifically* to address the point you made about those that care
>> more
>> > > > about throughput than ordering guarantees. The config of interest to
>> > you
>> > > is:
>> > > >
>> > > >   max.in.flight.requests.per.connection
>> > > >
>> > > > This defines how many parallel sends can be pipelined (over one
>> socket,
>> > > in
>> > > > the sender thread) before the send thread blocks. Samza forces this
>> to
>> > 1
>> > > > right now (because we wanted to guarantee ordering). It seems like a
>> > > > reasonable request to allow users to over-ride this with their own
>> > > setting
>> > > > if they want more parallelism. Could you open a JIRA for that?
>> > > >
>> > > > I should note, in smoke tests, with max-in-flight set to one in
>> Samza,
>> > > the
>> > > > perf seemed roughly on-par with the Samza running the old Kafka
>> > > producer. I
>> > > > also spoke to Jay at the last Kafka meetup, and he mentioned that
>> they
>> > > > don't see much of a performance boost when running max-in-flight >
>> 1.
>> > Jun
>> > > > did some perf comparison between the old and new Kafka producer, and
>> > put
>> > > > the information on some slides that he presented at the meetup. If
>> > you're
>> > > > interested, you should ping them on the Kafka mailing list.
>> > > >
>> > > > > Lastly, has anyone been able to get more MB/s out of a container
>> than
>> > > > what I have?
>> > > >
>> > > > Thus far, I (personally) haven't spent much time on producer-side
>> > > > optimization, so I don't have hard numbers on it. Our producer code
>> is
>> > > > pretty thin, so we're pretty much bound to what the Kafka producer
>> can
>> > > > do.If you're up for it, you might want to contribute something to:
>> > > >
>> > > >   https://issues.apache.org/jira/browse/SAMZA-6
>> > > >
>> > > > Here's what I'd recommend:
>> > > >
>> > > > 0. Write something reproducible and post it on SAMZA-6. For bonus
>> > points,
>> > > > write an equivalent raw-Kafka-producer test (no Samza) so we can
>> > compare
>> > > > them.
>> > > > 1. Checkout master.
>> > > > 2. Modify master to allow you to configure max-in-flights > 1 (line
>> 185
>> > > of
>> > > > KafkaConfig.scala).
>> > > > 3. Try setting acks to 0 (it's 1 by default).
>> > > >
>> > > > Try running your tests at every one of these steps, and see how it
>> > > affects
>> > > > performance. If you get to 3, and things are still slow, we can
>> loop in
>> > > > some Kakfa-dev folks.
>> > > >
>> > > > Cheers,
>> > > > Chris
>> > > >
>> > > > On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com>
>> > wrote:
>> > > >
>> > > >> Hi everyone,
>> > > >> I've done some raw Disk, Kafka and Samza benchmarking. I peaked
>> out a
>> > > >> single Samza container's consumer at around 2MB/s. Running a Kafka
>> > > >> Consumer
>> > > >> Perf test though on the same machine I can do 100's of MB/s. It
>> seems
>> > > like
>> > > >> most of the bottleneck exists in the Kafka async client. There
>> appears
>> > > to
>> > > >> be only 1 thread in the Kafka client rather than a thread pool and
>> due
>> > > to
>> > > >> the limitation that a container can't run on multiple cores this
>> > thread
>> > > >> gets scheduled I assume on the same core as the consumer and
>> process
>> > > call.
>> > > >>
>> > > >> I know a lot thought has been put into the design of maintaining
>> > parity
>> > > >> between task instances and partitions and preventing unpredictable
>> > > >> behavior
>> > > >> from a threaded system. A reasonable solution might be to just add
>> > > >> partitions and increase container count with the partition count.
>> This
>> > > is
>> > > >> at the cost of increasing memory usage on the node managers
>> > necessarily
>> > > >> due
>> > > >> to the increased container count.
>> > > >>
>> > > >> Has there been any design discussions into allowing multiple cores
>> on
>> > > on a
>> > > >> single container to allow better pipelining within the container to
>> > get
>> > > >> better throughput and also introducing a thread pool outside of
>> > Kafka's
>> > > >> client to allow concurrent produces to Kafka within the same
>> > container?
>> > > I
>> > > >> understand there are ordering concerns with this concurrency and
>> for
>> > > those
>> > > >> sensitive use cases the thread pool could be 1 but for use cases
>> where
>> > > >> ordering is less important and raw throughput is more of a concern
>> > they
>> > > >> can
>> > > >> achieve that with allowing current async produces. I also know that
>> > > Kafka
>> > > >> has plans to rework their producer but I haven't been able to find
>> if
>> > > this
>> > > >> includes introducing a thread pool to allow multiple async
>> produces.
>> > > >> Lastly, has anyone been able to get more MB/s out of a container
>> than
>> > > what
>> > > >> I have? Thanks!
>> > > >>
>> > > >> --
>> > > >> Jordan
>> > > >>
>> > > >
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Jordan Shaw
>> > Full Stack Software Engineer
>> > PubNub Inc
>> > 1045 17th St
>> > San Francisco, CA 94107
>> >
>>
>
>
>
> --
> Jordan Shaw
> Full Stack Software Engineer
> PubNub Inc
> 1045 17th St
> San Francisco, CA 94107
>



-- 
Jordan Shaw
Full Stack Software Engineer
PubNub Inc
1045 17th St
San Francisco, CA 94107

Re: container concurrency and pipelining

Posted by Jordan Shaw <jo...@pubnub.com>.
Hey Chris,
We've done pretty extensive testing already on that task. Here's a SS of a
sample of those results showing the 2MB/s rate. I haven't done those
profiling specifically, we were running htop and a network profiler to get
a general idea of system consumption. We'll add that to our todo's for
testing.

[image: Inline image 1]

Yesterday I was trying to get get your zopkio task to run on our cluster
and see if I get better through put. I almost got there but the
zopkio(python) kafka client wasn't connecting to my kafka cluster so
working on resolving those kind of issues and hopefully get that done
today. This is the error I was getting from zopkio:

[Errno 111] Connection refused

Other messages:

Traceback (most recent call last):
  File
"/tmp/samza-tests/samza-integration-tests/local/lib/python2.7/site-packages/zopkio/test_runner.py",
line 323, in _execute_single_test
    test.function()
  File "/tmp/samza-tests/scripts/tests/performance_tests.py", line 38, in
test_kafka_read_write_performance
    _load_data()
  File "/tmp/samza-tests/scripts/tests/performance_tests.py", line 67, in
_load_data
    kafka = util.get_kafka_client()
  File "/tmp/samza-tests/scripts/tests/util.py", line 60, in
get_kafka_client
    if not wait_for_server(kafka_hosts[0], kafka_port, 30):
  File "/tmp/samza-tests/scripts/tests/util.py", line 73, in wait_for_server
    s.connect((host, port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

Thanks!

-Jordan

On Tue, Feb 10, 2015 at 9:02 AM, Chris Riccomini <cr...@apache.org>
wrote:

> Hey Jordan,
>
> It looks like your task is almost identical to the one in SAMZA-548. Did
> you have a chance to test your job out with 0.9?
>
> >  If I just consume off the envelope I was seeing much faster consume
> rates. Which was one of the indications that the producer was causing
> problems.
>
> Yes, this sounds believable. Did you attach visual VM, and do CPU sampling?
> It'd be good to get a view of exactly where in the "produce" call things
> are slow.
>
> Cheers,
> Chris
>
> On Sun, Feb 8, 2015 at 9:47 PM, Jordan Shaw <jo...@pubnub.com> wrote:
>
> > Hey Chris,
> > Sorry for the delayed response, did a Tahoe 3 day weekend.
> >
> > Could you post your configs, and version of Samza that you're running?
> > https://gist.github.com/jshaw86/02dbca21ae32d1a9a24e. We were running
> 0.8
> > Samza the latest stable release. We upgraded to the 0.9 branch on Friday
> > and Kafka as well so we'll go over that starting tomorrow.
> >
> > How many threads were you running? Can you describe (or post) the two
> tests
> > that you did?
> > We ran a few different thread combinations of kafka-consumer-perf,sh and
> > kafka-producer-perf.sh, 1 or 3 Producer Threads, 1 or N Consumer Threads
> > where N = Partition Count. We only wen't up to 2 partitions though. We
> just
> > ran the consumer and producer perf tests individually( not concurrently )
> > here are the results:
> https://gist.github.com/jshaw86/0bdd4d5bb1e233cd0b3f
> >
> > Here is task I was using to do the Samza perf of the consumer and
> producer:
> > https://gist.github.com/jshaw86/9c09a16112eee440f681. It's pretty basic
> > the
> > idea is just get a message off the envelope and send it back out as fast
> as
> > possible and that's where I was seeing the 2MB/s. If I just consume off
> the
> > envelope I was seeing much faster consume rates. Which was one of the
> > indications that the producer was causing problems.
> >
> > Thanks a lot for your perf tests I'll review it tomorrow against my
> configs
> > and see if I can come up with what I'm doing wrong. I also submitted the
> > JIRA per your request. Cheers.
> >
> >
> > On Sun, Feb 8, 2015 at 8:21 PM, Chris Riccomini <cr...@apache.org>
> > wrote:
> >
> > > Hey Jordan,
> > >
> > > I've put up a perf test on:
> > >
> > >   https://issues.apache.org/jira/browse/SAMZA-548
> > >
> > > The JIRA describes the test implementation, observed performance, and
> > noted
> > > deficiencies in the test. I'm getting much more than 2mb/s.
> > >
> > > Cheers,
> > > Chris
> > >
> > > On Fri, Feb 6, 2015 at 8:34 AM, Chris Riccomini <criccomini@apache.org
> >
> > > wrote:
> > >
> > > > Hey Jordan,
> > > >
> > > > > I peaked out a single Samza container's consumer at around 2MB/s.
> > > >
> > > > Could you post your configs, and version of Samza that you're
> running?
> > > >
> > > > > Running a Kafka Consumer Perf test though on the same machine I can
> > do
> > > > 100's of MB/s.
> > > >
> > > > How many threads were you running? Also, you're saying "consumer
> perf"
> > > > here. Consumer and producer exhibit very different throughput
> > > > characteristics. Can you describe (or post) the two tests that you
> did?
> > > >
> > > > > It seems like most of the bottleneck exists in the Kafka async
> > client.
> > > >
> > > > Yes, this is what we've observed as well.
> > > >
> > > > > A reasonable solution might be to just add partitions and increase
> > > > container count with the partition count.
> > > >
> > > > This is usually the guidance that we give. If you have 8 cores, and
> > want
> > > > to max out your machine, you should run 8 containers.
> > > >
> > > > > Has there been any design discussions into allowing multiple cores
> on
> > > > on a single container to allow better pipelining within the
> container?
> > > >
> > > > The discussion pretty much is what you've just described. We never
> felt
> > > > that the increase in code complexity, configs, mental model was worth
> > the
> > > > trade-off. My argument is that we should make the Kafka producer go
> > > faster
> > > > (see comments below), rather than increasing complexity in Samza to
> get
> > > > around it.
> > > >
> > > > > I also know that Kafka has plans to rework their producer but I
> > haven't
> > > > been able to find if this includes introducing a thread pool to allow
> > > > multiple async produces.
> > > >
> > > > We have upgraded Samza to the new producer in SAMZA-227. The code
> > changes
> > > > are on master now. You should definitely check that out.
> > > >
> > > > The new Kafka producer works as follows: there is one "sender"
> thread.
> > > > When you send messages, the messages get queued up, and the sender
> > thread
> > > > takes them off the queue, and sends them to Kafka. One trick with the
> > new
> > > > producer is that they are using NIO, and allow for pipelining. This
> is
> > > > *specifically* to address the point you made about those that care
> more
> > > > about throughput than ordering guarantees. The config of interest to
> > you
> > > is:
> > > >
> > > >   max.in.flight.requests.per.connection
> > > >
> > > > This defines how many parallel sends can be pipelined (over one
> socket,
> > > in
> > > > the sender thread) before the send thread blocks. Samza forces this
> to
> > 1
> > > > right now (because we wanted to guarantee ordering). It seems like a
> > > > reasonable request to allow users to over-ride this with their own
> > > setting
> > > > if they want more parallelism. Could you open a JIRA for that?
> > > >
> > > > I should note, in smoke tests, with max-in-flight set to one in
> Samza,
> > > the
> > > > perf seemed roughly on-par with the Samza running the old Kafka
> > > producer. I
> > > > also spoke to Jay at the last Kafka meetup, and he mentioned that
> they
> > > > don't see much of a performance boost when running max-in-flight > 1.
> > Jun
> > > > did some perf comparison between the old and new Kafka producer, and
> > put
> > > > the information on some slides that he presented at the meetup. If
> > you're
> > > > interested, you should ping them on the Kafka mailing list.
> > > >
> > > > > Lastly, has anyone been able to get more MB/s out of a container
> than
> > > > what I have?
> > > >
> > > > Thus far, I (personally) haven't spent much time on producer-side
> > > > optimization, so I don't have hard numbers on it. Our producer code
> is
> > > > pretty thin, so we're pretty much bound to what the Kafka producer
> can
> > > > do.If you're up for it, you might want to contribute something to:
> > > >
> > > >   https://issues.apache.org/jira/browse/SAMZA-6
> > > >
> > > > Here's what I'd recommend:
> > > >
> > > > 0. Write something reproducible and post it on SAMZA-6. For bonus
> > points,
> > > > write an equivalent raw-Kafka-producer test (no Samza) so we can
> > compare
> > > > them.
> > > > 1. Checkout master.
> > > > 2. Modify master to allow you to configure max-in-flights > 1 (line
> 185
> > > of
> > > > KafkaConfig.scala).
> > > > 3. Try setting acks to 0 (it's 1 by default).
> > > >
> > > > Try running your tests at every one of these steps, and see how it
> > > affects
> > > > performance. If you get to 3, and things are still slow, we can loop
> in
> > > > some Kakfa-dev folks.
> > > >
> > > > Cheers,
> > > > Chris
> > > >
> > > > On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com>
> > wrote:
> > > >
> > > >> Hi everyone,
> > > >> I've done some raw Disk, Kafka and Samza benchmarking. I peaked out
> a
> > > >> single Samza container's consumer at around 2MB/s. Running a Kafka
> > > >> Consumer
> > > >> Perf test though on the same machine I can do 100's of MB/s. It
> seems
> > > like
> > > >> most of the bottleneck exists in the Kafka async client. There
> appears
> > > to
> > > >> be only 1 thread in the Kafka client rather than a thread pool and
> due
> > > to
> > > >> the limitation that a container can't run on multiple cores this
> > thread
> > > >> gets scheduled I assume on the same core as the consumer and process
> > > call.
> > > >>
> > > >> I know a lot thought has been put into the design of maintaining
> > parity
> > > >> between task instances and partitions and preventing unpredictable
> > > >> behavior
> > > >> from a threaded system. A reasonable solution might be to just add
> > > >> partitions and increase container count with the partition count.
> This
> > > is
> > > >> at the cost of increasing memory usage on the node managers
> > necessarily
> > > >> due
> > > >> to the increased container count.
> > > >>
> > > >> Has there been any design discussions into allowing multiple cores
> on
> > > on a
> > > >> single container to allow better pipelining within the container to
> > get
> > > >> better throughput and also introducing a thread pool outside of
> > Kafka's
> > > >> client to allow concurrent produces to Kafka within the same
> > container?
> > > I
> > > >> understand there are ordering concerns with this concurrency and for
> > > those
> > > >> sensitive use cases the thread pool could be 1 but for use cases
> where
> > > >> ordering is less important and raw throughput is more of a concern
> > they
> > > >> can
> > > >> achieve that with allowing current async produces. I also know that
> > > Kafka
> > > >> has plans to rework their producer but I haven't been able to find
> if
> > > this
> > > >> includes introducing a thread pool to allow multiple async produces.
> > > >> Lastly, has anyone been able to get more MB/s out of a container
> than
> > > what
> > > >> I have? Thanks!
> > > >>
> > > >> --
> > > >> Jordan
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Jordan Shaw
> > Full Stack Software Engineer
> > PubNub Inc
> > 1045 17th St
> > San Francisco, CA 94107
> >
>



-- 
Jordan Shaw
Full Stack Software Engineer
PubNub Inc
1045 17th St
San Francisco, CA 94107

Re: container concurrency and pipelining

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jordan,

It looks like your task is almost identical to the one in SAMZA-548. Did
you have a chance to test your job out with 0.9?

>  If I just consume off the envelope I was seeing much faster consume
rates. Which was one of the indications that the producer was causing
problems.

Yes, this sounds believable. Did you attach visual VM, and do CPU sampling?
It'd be good to get a view of exactly where in the "produce" call things
are slow.

Cheers,
Chris

On Sun, Feb 8, 2015 at 9:47 PM, Jordan Shaw <jo...@pubnub.com> wrote:

> Hey Chris,
> Sorry for the delayed response, did a Tahoe 3 day weekend.
>
> Could you post your configs, and version of Samza that you're running?
> https://gist.github.com/jshaw86/02dbca21ae32d1a9a24e. We were running 0.8
> Samza the latest stable release. We upgraded to the 0.9 branch on Friday
> and Kafka as well so we'll go over that starting tomorrow.
>
> How many threads were you running? Can you describe (or post) the two tests
> that you did?
> We ran a few different thread combinations of kafka-consumer-perf,sh and
> kafka-producer-perf.sh, 1 or 3 Producer Threads, 1 or N Consumer Threads
> where N = Partition Count. We only wen't up to 2 partitions though. We just
> ran the consumer and producer perf tests individually( not concurrently )
> here are the results: https://gist.github.com/jshaw86/0bdd4d5bb1e233cd0b3f
>
> Here is task I was using to do the Samza perf of the consumer and producer:
> https://gist.github.com/jshaw86/9c09a16112eee440f681. It's pretty basic
> the
> idea is just get a message off the envelope and send it back out as fast as
> possible and that's where I was seeing the 2MB/s. If I just consume off the
> envelope I was seeing much faster consume rates. Which was one of the
> indications that the producer was causing problems.
>
> Thanks a lot for your perf tests I'll review it tomorrow against my configs
> and see if I can come up with what I'm doing wrong. I also submitted the
> JIRA per your request. Cheers.
>
>
> On Sun, Feb 8, 2015 at 8:21 PM, Chris Riccomini <cr...@apache.org>
> wrote:
>
> > Hey Jordan,
> >
> > I've put up a perf test on:
> >
> >   https://issues.apache.org/jira/browse/SAMZA-548
> >
> > The JIRA describes the test implementation, observed performance, and
> noted
> > deficiencies in the test. I'm getting much more than 2mb/s.
> >
> > Cheers,
> > Chris
> >
> > On Fri, Feb 6, 2015 at 8:34 AM, Chris Riccomini <cr...@apache.org>
> > wrote:
> >
> > > Hey Jordan,
> > >
> > > > I peaked out a single Samza container's consumer at around 2MB/s.
> > >
> > > Could you post your configs, and version of Samza that you're running?
> > >
> > > > Running a Kafka Consumer Perf test though on the same machine I can
> do
> > > 100's of MB/s.
> > >
> > > How many threads were you running? Also, you're saying "consumer perf"
> > > here. Consumer and producer exhibit very different throughput
> > > characteristics. Can you describe (or post) the two tests that you did?
> > >
> > > > It seems like most of the bottleneck exists in the Kafka async
> client.
> > >
> > > Yes, this is what we've observed as well.
> > >
> > > > A reasonable solution might be to just add partitions and increase
> > > container count with the partition count.
> > >
> > > This is usually the guidance that we give. If you have 8 cores, and
> want
> > > to max out your machine, you should run 8 containers.
> > >
> > > > Has there been any design discussions into allowing multiple cores on
> > > on a single container to allow better pipelining within the container?
> > >
> > > The discussion pretty much is what you've just described. We never felt
> > > that the increase in code complexity, configs, mental model was worth
> the
> > > trade-off. My argument is that we should make the Kafka producer go
> > faster
> > > (see comments below), rather than increasing complexity in Samza to get
> > > around it.
> > >
> > > > I also know that Kafka has plans to rework their producer but I
> haven't
> > > been able to find if this includes introducing a thread pool to allow
> > > multiple async produces.
> > >
> > > We have upgraded Samza to the new producer in SAMZA-227. The code
> changes
> > > are on master now. You should definitely check that out.
> > >
> > > The new Kafka producer works as follows: there is one "sender" thread.
> > > When you send messages, the messages get queued up, and the sender
> thread
> > > takes them off the queue, and sends them to Kafka. One trick with the
> new
> > > producer is that they are using NIO, and allow for pipelining. This is
> > > *specifically* to address the point you made about those that care more
> > > about throughput than ordering guarantees. The config of interest to
> you
> > is:
> > >
> > >   max.in.flight.requests.per.connection
> > >
> > > This defines how many parallel sends can be pipelined (over one socket,
> > in
> > > the sender thread) before the send thread blocks. Samza forces this to
> 1
> > > right now (because we wanted to guarantee ordering). It seems like a
> > > reasonable request to allow users to over-ride this with their own
> > setting
> > > if they want more parallelism. Could you open a JIRA for that?
> > >
> > > I should note, in smoke tests, with max-in-flight set to one in Samza,
> > the
> > > perf seemed roughly on-par with the Samza running the old Kafka
> > producer. I
> > > also spoke to Jay at the last Kafka meetup, and he mentioned that they
> > > don't see much of a performance boost when running max-in-flight > 1.
> Jun
> > > did some perf comparison between the old and new Kafka producer, and
> put
> > > the information on some slides that he presented at the meetup. If
> you're
> > > interested, you should ping them on the Kafka mailing list.
> > >
> > > > Lastly, has anyone been able to get more MB/s out of a container than
> > > what I have?
> > >
> > > Thus far, I (personally) haven't spent much time on producer-side
> > > optimization, so I don't have hard numbers on it. Our producer code is
> > > pretty thin, so we're pretty much bound to what the Kafka producer can
> > > do.If you're up for it, you might want to contribute something to:
> > >
> > >   https://issues.apache.org/jira/browse/SAMZA-6
> > >
> > > Here's what I'd recommend:
> > >
> > > 0. Write something reproducible and post it on SAMZA-6. For bonus
> points,
> > > write an equivalent raw-Kafka-producer test (no Samza) so we can
> compare
> > > them.
> > > 1. Checkout master.
> > > 2. Modify master to allow you to configure max-in-flights > 1 (line 185
> > of
> > > KafkaConfig.scala).
> > > 3. Try setting acks to 0 (it's 1 by default).
> > >
> > > Try running your tests at every one of these steps, and see how it
> > affects
> > > performance. If you get to 3, and things are still slow, we can loop in
> > > some Kakfa-dev folks.
> > >
> > > Cheers,
> > > Chris
> > >
> > > On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com>
> wrote:
> > >
> > >> Hi everyone,
> > >> I've done some raw Disk, Kafka and Samza benchmarking. I peaked out a
> > >> single Samza container's consumer at around 2MB/s. Running a Kafka
> > >> Consumer
> > >> Perf test though on the same machine I can do 100's of MB/s. It seems
> > like
> > >> most of the bottleneck exists in the Kafka async client. There appears
> > to
> > >> be only 1 thread in the Kafka client rather than a thread pool and due
> > to
> > >> the limitation that a container can't run on multiple cores this
> thread
> > >> gets scheduled I assume on the same core as the consumer and process
> > call.
> > >>
> > >> I know a lot thought has been put into the design of maintaining
> parity
> > >> between task instances and partitions and preventing unpredictable
> > >> behavior
> > >> from a threaded system. A reasonable solution might be to just add
> > >> partitions and increase container count with the partition count. This
> > is
> > >> at the cost of increasing memory usage on the node managers
> necessarily
> > >> due
> > >> to the increased container count.
> > >>
> > >> Has there been any design discussions into allowing multiple cores on
> > on a
> > >> single container to allow better pipelining within the container to
> get
> > >> better throughput and also introducing a thread pool outside of
> Kafka's
> > >> client to allow concurrent produces to Kafka within the same
> container?
> > I
> > >> understand there are ordering concerns with this concurrency and for
> > those
> > >> sensitive use cases the thread pool could be 1 but for use cases where
> > >> ordering is less important and raw throughput is more of a concern
> they
> > >> can
> > >> achieve that with allowing current async produces. I also know that
> > Kafka
> > >> has plans to rework their producer but I haven't been able to find if
> > this
> > >> includes introducing a thread pool to allow multiple async produces.
> > >> Lastly, has anyone been able to get more MB/s out of a container than
> > what
> > >> I have? Thanks!
> > >>
> > >> --
> > >> Jordan
> > >>
> > >
> > >
> >
>
>
>
> --
> Jordan Shaw
> Full Stack Software Engineer
> PubNub Inc
> 1045 17th St
> San Francisco, CA 94107
>

Re: container concurrency and pipelining

Posted by Jordan Shaw <jo...@pubnub.com>.
Hey Chris,
Sorry for the delayed response, did a Tahoe 3 day weekend.

Could you post your configs, and version of Samza that you're running?
https://gist.github.com/jshaw86/02dbca21ae32d1a9a24e. We were running 0.8
Samza the latest stable release. We upgraded to the 0.9 branch on Friday
and Kafka as well so we'll go over that starting tomorrow.

How many threads were you running? Can you describe (or post) the two tests
that you did?
We ran a few different thread combinations of kafka-consumer-perf,sh and
kafka-producer-perf.sh, 1 or 3 Producer Threads, 1 or N Consumer Threads
where N = Partition Count. We only wen't up to 2 partitions though. We just
ran the consumer and producer perf tests individually( not concurrently )
here are the results: https://gist.github.com/jshaw86/0bdd4d5bb1e233cd0b3f

Here is task I was using to do the Samza perf of the consumer and producer:
https://gist.github.com/jshaw86/9c09a16112eee440f681. It's pretty basic the
idea is just get a message off the envelope and send it back out as fast as
possible and that's where I was seeing the 2MB/s. If I just consume off the
envelope I was seeing much faster consume rates. Which was one of the
indications that the producer was causing problems.

Thanks a lot for your perf tests I'll review it tomorrow against my configs
and see if I can come up with what I'm doing wrong. I also submitted the
JIRA per your request. Cheers.


On Sun, Feb 8, 2015 at 8:21 PM, Chris Riccomini <cr...@apache.org>
wrote:

> Hey Jordan,
>
> I've put up a perf test on:
>
>   https://issues.apache.org/jira/browse/SAMZA-548
>
> The JIRA describes the test implementation, observed performance, and noted
> deficiencies in the test. I'm getting much more than 2mb/s.
>
> Cheers,
> Chris
>
> On Fri, Feb 6, 2015 at 8:34 AM, Chris Riccomini <cr...@apache.org>
> wrote:
>
> > Hey Jordan,
> >
> > > I peaked out a single Samza container's consumer at around 2MB/s.
> >
> > Could you post your configs, and version of Samza that you're running?
> >
> > > Running a Kafka Consumer Perf test though on the same machine I can do
> > 100's of MB/s.
> >
> > How many threads were you running? Also, you're saying "consumer perf"
> > here. Consumer and producer exhibit very different throughput
> > characteristics. Can you describe (or post) the two tests that you did?
> >
> > > It seems like most of the bottleneck exists in the Kafka async client.
> >
> > Yes, this is what we've observed as well.
> >
> > > A reasonable solution might be to just add partitions and increase
> > container count with the partition count.
> >
> > This is usually the guidance that we give. If you have 8 cores, and want
> > to max out your machine, you should run 8 containers.
> >
> > > Has there been any design discussions into allowing multiple cores on
> > on a single container to allow better pipelining within the container?
> >
> > The discussion pretty much is what you've just described. We never felt
> > that the increase in code complexity, configs, mental model was worth the
> > trade-off. My argument is that we should make the Kafka producer go
> faster
> > (see comments below), rather than increasing complexity in Samza to get
> > around it.
> >
> > > I also know that Kafka has plans to rework their producer but I haven't
> > been able to find if this includes introducing a thread pool to allow
> > multiple async produces.
> >
> > We have upgraded Samza to the new producer in SAMZA-227. The code changes
> > are on master now. You should definitely check that out.
> >
> > The new Kafka producer works as follows: there is one "sender" thread.
> > When you send messages, the messages get queued up, and the sender thread
> > takes them off the queue, and sends them to Kafka. One trick with the new
> > producer is that they are using NIO, and allow for pipelining. This is
> > *specifically* to address the point you made about those that care more
> > about throughput than ordering guarantees. The config of interest to you
> is:
> >
> >   max.in.flight.requests.per.connection
> >
> > This defines how many parallel sends can be pipelined (over one socket,
> in
> > the sender thread) before the send thread blocks. Samza forces this to 1
> > right now (because we wanted to guarantee ordering). It seems like a
> > reasonable request to allow users to over-ride this with their own
> setting
> > if they want more parallelism. Could you open a JIRA for that?
> >
> > I should note, in smoke tests, with max-in-flight set to one in Samza,
> the
> > perf seemed roughly on-par with the Samza running the old Kafka
> producer. I
> > also spoke to Jay at the last Kafka meetup, and he mentioned that they
> > don't see much of a performance boost when running max-in-flight > 1. Jun
> > did some perf comparison between the old and new Kafka producer, and put
> > the information on some slides that he presented at the meetup. If you're
> > interested, you should ping them on the Kafka mailing list.
> >
> > > Lastly, has anyone been able to get more MB/s out of a container than
> > what I have?
> >
> > Thus far, I (personally) haven't spent much time on producer-side
> > optimization, so I don't have hard numbers on it. Our producer code is
> > pretty thin, so we're pretty much bound to what the Kafka producer can
> > do.If you're up for it, you might want to contribute something to:
> >
> >   https://issues.apache.org/jira/browse/SAMZA-6
> >
> > Here's what I'd recommend:
> >
> > 0. Write something reproducible and post it on SAMZA-6. For bonus points,
> > write an equivalent raw-Kafka-producer test (no Samza) so we can compare
> > them.
> > 1. Checkout master.
> > 2. Modify master to allow you to configure max-in-flights > 1 (line 185
> of
> > KafkaConfig.scala).
> > 3. Try setting acks to 0 (it's 1 by default).
> >
> > Try running your tests at every one of these steps, and see how it
> affects
> > performance. If you get to 3, and things are still slow, we can loop in
> > some Kakfa-dev folks.
> >
> > Cheers,
> > Chris
> >
> > On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com> wrote:
> >
> >> Hi everyone,
> >> I've done some raw Disk, Kafka and Samza benchmarking. I peaked out a
> >> single Samza container's consumer at around 2MB/s. Running a Kafka
> >> Consumer
> >> Perf test though on the same machine I can do 100's of MB/s. It seems
> like
> >> most of the bottleneck exists in the Kafka async client. There appears
> to
> >> be only 1 thread in the Kafka client rather than a thread pool and due
> to
> >> the limitation that a container can't run on multiple cores this thread
> >> gets scheduled I assume on the same core as the consumer and process
> call.
> >>
> >> I know a lot thought has been put into the design of maintaining parity
> >> between task instances and partitions and preventing unpredictable
> >> behavior
> >> from a threaded system. A reasonable solution might be to just add
> >> partitions and increase container count with the partition count. This
> is
> >> at the cost of increasing memory usage on the node managers necessarily
> >> due
> >> to the increased container count.
> >>
> >> Has there been any design discussions into allowing multiple cores on
> on a
> >> single container to allow better pipelining within the container to get
> >> better throughput and also introducing a thread pool outside of Kafka's
> >> client to allow concurrent produces to Kafka within the same container?
> I
> >> understand there are ordering concerns with this concurrency and for
> those
> >> sensitive use cases the thread pool could be 1 but for use cases where
> >> ordering is less important and raw throughput is more of a concern they
> >> can
> >> achieve that with allowing current async produces. I also know that
> Kafka
> >> has plans to rework their producer but I haven't been able to find if
> this
> >> includes introducing a thread pool to allow multiple async produces.
> >> Lastly, has anyone been able to get more MB/s out of a container than
> what
> >> I have? Thanks!
> >>
> >> --
> >> Jordan
> >>
> >
> >
>



-- 
Jordan Shaw
Full Stack Software Engineer
PubNub Inc
1045 17th St
San Francisco, CA 94107

Re: container concurrency and pipelining

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jordan,

I've put up a perf test on:

  https://issues.apache.org/jira/browse/SAMZA-548

The JIRA describes the test implementation, observed performance, and noted
deficiencies in the test. I'm getting much more than 2mb/s.

Cheers,
Chris

On Fri, Feb 6, 2015 at 8:34 AM, Chris Riccomini <cr...@apache.org>
wrote:

> Hey Jordan,
>
> > I peaked out a single Samza container's consumer at around 2MB/s.
>
> Could you post your configs, and version of Samza that you're running?
>
> > Running a Kafka Consumer Perf test though on the same machine I can do
> 100's of MB/s.
>
> How many threads were you running? Also, you're saying "consumer perf"
> here. Consumer and producer exhibit very different throughput
> characteristics. Can you describe (or post) the two tests that you did?
>
> > It seems like most of the bottleneck exists in the Kafka async client.
>
> Yes, this is what we've observed as well.
>
> > A reasonable solution might be to just add partitions and increase
> container count with the partition count.
>
> This is usually the guidance that we give. If you have 8 cores, and want
> to max out your machine, you should run 8 containers.
>
> > Has there been any design discussions into allowing multiple cores on
> on a single container to allow better pipelining within the container?
>
> The discussion pretty much is what you've just described. We never felt
> that the increase in code complexity, configs, mental model was worth the
> trade-off. My argument is that we should make the Kafka producer go faster
> (see comments below), rather than increasing complexity in Samza to get
> around it.
>
> > I also know that Kafka has plans to rework their producer but I haven't
> been able to find if this includes introducing a thread pool to allow
> multiple async produces.
>
> We have upgraded Samza to the new producer in SAMZA-227. The code changes
> are on master now. You should definitely check that out.
>
> The new Kafka producer works as follows: there is one "sender" thread.
> When you send messages, the messages get queued up, and the sender thread
> takes them off the queue, and sends them to Kafka. One trick with the new
> producer is that they are using NIO, and allow for pipelining. This is
> *specifically* to address the point you made about those that care more
> about throughput than ordering guarantees. The config of interest to you is:
>
>   max.in.flight.requests.per.connection
>
> This defines how many parallel sends can be pipelined (over one socket, in
> the sender thread) before the send thread blocks. Samza forces this to 1
> right now (because we wanted to guarantee ordering). It seems like a
> reasonable request to allow users to over-ride this with their own setting
> if they want more parallelism. Could you open a JIRA for that?
>
> I should note, in smoke tests, with max-in-flight set to one in Samza, the
> perf seemed roughly on-par with the Samza running the old Kafka producer. I
> also spoke to Jay at the last Kafka meetup, and he mentioned that they
> don't see much of a performance boost when running max-in-flight > 1. Jun
> did some perf comparison between the old and new Kafka producer, and put
> the information on some slides that he presented at the meetup. If you're
> interested, you should ping them on the Kafka mailing list.
>
> > Lastly, has anyone been able to get more MB/s out of a container than
> what I have?
>
> Thus far, I (personally) haven't spent much time on producer-side
> optimization, so I don't have hard numbers on it. Our producer code is
> pretty thin, so we're pretty much bound to what the Kafka producer can
> do.If you're up for it, you might want to contribute something to:
>
>   https://issues.apache.org/jira/browse/SAMZA-6
>
> Here's what I'd recommend:
>
> 0. Write something reproducible and post it on SAMZA-6. For bonus points,
> write an equivalent raw-Kafka-producer test (no Samza) so we can compare
> them.
> 1. Checkout master.
> 2. Modify master to allow you to configure max-in-flights > 1 (line 185 of
> KafkaConfig.scala).
> 3. Try setting acks to 0 (it's 1 by default).
>
> Try running your tests at every one of these steps, and see how it affects
> performance. If you get to 3, and things are still slow, we can loop in
> some Kakfa-dev folks.
>
> Cheers,
> Chris
>
> On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com> wrote:
>
>> Hi everyone,
>> I've done some raw Disk, Kafka and Samza benchmarking. I peaked out a
>> single Samza container's consumer at around 2MB/s. Running a Kafka
>> Consumer
>> Perf test though on the same machine I can do 100's of MB/s. It seems like
>> most of the bottleneck exists in the Kafka async client. There appears to
>> be only 1 thread in the Kafka client rather than a thread pool and due to
>> the limitation that a container can't run on multiple cores this thread
>> gets scheduled I assume on the same core as the consumer and process call.
>>
>> I know a lot thought has been put into the design of maintaining parity
>> between task instances and partitions and preventing unpredictable
>> behavior
>> from a threaded system. A reasonable solution might be to just add
>> partitions and increase container count with the partition count. This is
>> at the cost of increasing memory usage on the node managers necessarily
>> due
>> to the increased container count.
>>
>> Has there been any design discussions into allowing multiple cores on on a
>> single container to allow better pipelining within the container to get
>> better throughput and also introducing a thread pool outside of Kafka's
>> client to allow concurrent produces to Kafka within the same container? I
>> understand there are ordering concerns with this concurrency and for those
>> sensitive use cases the thread pool could be 1 but for use cases where
>> ordering is less important and raw throughput is more of a concern they
>> can
>> achieve that with allowing current async produces. I also know that Kafka
>> has plans to rework their producer but I haven't been able to find if this
>> includes introducing a thread pool to allow multiple async produces.
>> Lastly, has anyone been able to get more MB/s out of a container than what
>> I have? Thanks!
>>
>> --
>> Jordan
>>
>
>

Re: container concurrency and pipelining

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jordan,

> I peaked out a single Samza container's consumer at around 2MB/s.

Could you post your configs, and version of Samza that you're running?

> Running a Kafka Consumer Perf test though on the same machine I can do
100's of MB/s.

How many threads were you running? Also, you're saying "consumer perf"
here. Consumer and producer exhibit very different throughput
characteristics. Can you describe (or post) the two tests that you did?

> It seems like most of the bottleneck exists in the Kafka async client.

Yes, this is what we've observed as well.

> A reasonable solution might be to just add partitions and increase
container count with the partition count.

This is usually the guidance that we give. If you have 8 cores, and want to
max out your machine, you should run 8 containers.

> Has there been any design discussions into allowing multiple cores on on
a single container to allow better pipelining within the container?

The discussion pretty much is what you've just described. We never felt
that the increase in code complexity, configs, mental model was worth the
trade-off. My argument is that we should make the Kafka producer go faster
(see comments below), rather than increasing complexity in Samza to get
around it.

> I also know that Kafka has plans to rework their producer but I haven't
been able to find if this includes introducing a thread pool to allow
multiple async produces.

We have upgraded Samza to the new producer in SAMZA-227. The code changes
are on master now. You should definitely check that out.

The new Kafka producer works as follows: there is one "sender" thread. When
you send messages, the messages get queued up, and the sender thread takes
them off the queue, and sends them to Kafka. One trick with the new
producer is that they are using NIO, and allow for pipelining. This is
*specifically* to address the point you made about those that care more
about throughput than ordering guarantees. The config of interest to you is:

  max.in.flight.requests.per.connection

This defines how many parallel sends can be pipelined (over one socket, in
the sender thread) before the send thread blocks. Samza forces this to 1
right now (because we wanted to guarantee ordering). It seems like a
reasonable request to allow users to over-ride this with their own setting
if they want more parallelism. Could you open a JIRA for that?

I should note, in smoke tests, with max-in-flight set to one in Samza, the
perf seemed roughly on-par with the Samza running the old Kafka producer. I
also spoke to Jay at the last Kafka meetup, and he mentioned that they
don't see much of a performance boost when running max-in-flight > 1. Jun
did some perf comparison between the old and new Kafka producer, and put
the information on some slides that he presented at the meetup. If you're
interested, you should ping them on the Kafka mailing list.

> Lastly, has anyone been able to get more MB/s out of a container than
what I have?

Thus far, I (personally) haven't spent much time on producer-side
optimization, so I don't have hard numbers on it. Our producer code is
pretty thin, so we're pretty much bound to what the Kafka producer can
do.If you're up for it, you might want to contribute something to:

  https://issues.apache.org/jira/browse/SAMZA-6

Here's what I'd recommend:

0. Write something reproducible and post it on SAMZA-6. For bonus points,
write an equivalent raw-Kafka-producer test (no Samza) so we can compare
them.
1. Checkout master.
2. Modify master to allow you to configure max-in-flights > 1 (line 185 of
KafkaConfig.scala).
3. Try setting acks to 0 (it's 1 by default).

Try running your tests at every one of these steps, and see how it affects
performance. If you get to 3, and things are still slow, we can loop in
some Kakfa-dev folks.

Cheers,
Chris

On Fri, Feb 6, 2015 at 12:00 AM, Jordan Shaw <jo...@pubnub.com> wrote:

> Hi everyone,
> I've done some raw Disk, Kafka and Samza benchmarking. I peaked out a
> single Samza container's consumer at around 2MB/s. Running a Kafka Consumer
> Perf test though on the same machine I can do 100's of MB/s. It seems like
> most of the bottleneck exists in the Kafka async client. There appears to
> be only 1 thread in the Kafka client rather than a thread pool and due to
> the limitation that a container can't run on multiple cores this thread
> gets scheduled I assume on the same core as the consumer and process call.
>
> I know a lot thought has been put into the design of maintaining parity
> between task instances and partitions and preventing unpredictable behavior
> from a threaded system. A reasonable solution might be to just add
> partitions and increase container count with the partition count. This is
> at the cost of increasing memory usage on the node managers necessarily due
> to the increased container count.
>
> Has there been any design discussions into allowing multiple cores on on a
> single container to allow better pipelining within the container to get
> better throughput and also introducing a thread pool outside of Kafka's
> client to allow concurrent produces to Kafka within the same container? I
> understand there are ordering concerns with this concurrency and for those
> sensitive use cases the thread pool could be 1 but for use cases where
> ordering is less important and raw throughput is more of a concern they can
> achieve that with allowing current async produces. I also know that Kafka
> has plans to rework their producer but I haven't been able to find if this
> includes introducing a thread pool to allow multiple async produces.
> Lastly, has anyone been able to get more MB/s out of a container than what
> I have? Thanks!
>
> --
> Jordan
>