You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arti Pande <pa...@gmail.com> on 2020/06/10 20:06:00 UTC

Flink Async IO operator tuning / micro-benchmarks

As Flink Async IO operator is designed for external API or DB calls, are
there any specific guidelines / tips for scaling up this operator?
Particularly for use-cases where incoming events are being ingested at a
very high-speed and the Async IO operator with orderedWait mode can not
keep up with that speed (although the target API endpoint it is calling is
load tested to provide much higher throughput with very minimal latency).
In our case adding Async IO operator to the pipeline *reduced the
throughput by 88% to 90%*. This is huge performance hit!

We tried a couple of things:

   1. Increasing the async buffer capacity parameter, there by increasing
   the number of concurrent requests at any given point in time that are
   waiting for response. This proved counter-productive beyond a very small
   number like 50 or 100.
   2. Increasing the operator parallelism. This does not help much as the
   number of cores on our machines are limited (8 or 12)
   3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
   maxConnections, maxConnectionsPerHost) and the size of FixedThreadPool
   used by the Listener of its Future. Again without much improvement.

Our observation is that although Async IO operator works for one stream
element at a time, the operator and its underlying HTTP client are
multithreaded and need higher core machines for high-speed stream
processing. If the only machines available for this kind of applications
are 8 to 16 cores, we face challenges in meeting the required throughput
and latency SLAs.

Are there any micro-benchmarks or tuning guidelines for using Async IO for
high-speed stream processing, so we know how much throughput to expect from
it?
Thanks & regards,
Arti

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arvid Heise <ar...@ververica.com>.
Hi Arti,

ouch 3M is pretty far off the current setting.

Flink aside, you need to use 100 machines at the very minimum with the
current approach (AsyncHTTP and your evaluated machine). That's probably a
point where I'd try other libraries first and most importantly I'd evaluate
different machines. 100 nodes clusters are not unheard of (netflix uses a
shared 10k node cluster for Flink), but it's already big for an application
cluster and I always assume that an application grows over time and
naturally needs more resources.

First I'd measure if you are CPU or network bound. Chances are there that
you are in a cloud setting and your internet connection is just not beefy
enough. You could also check if you could have a shortcut connection with
your third party endpoint (VPC peering).

If you are CPU bound, I'd also try to use non-java tools like wrk2[1] to
just measure how fast you can get on a single machine. I'd use that tool to
also check other machine types.

Just by briefly checking some HTTP benchmarks, I see that 3M/s seems to be
a really high number even for 2020. It feels like a more sophisticated
approach than brute forcing may save lots of money. Is it possible to cache
results? Is it possible to do batch requests (100 at a time)?

Indeed it feels as if Flink is not the perfect fit, but there is no other
technology that pops immediately into my mind that would be able to
naturally perform 3M/s http requests with minimal resource usage. It's just
a huge goal.

[1] https://github.com/giltene/wrk2

On Fri, Jun 12, 2020 at 4:20 PM Arti Pande <pa...@gmail.com> wrote:

> Hi Arvid,
>
> *Shared api client*: Actually in the flow of writing I missed to mention
> that we switched to a static shared instance of async http client for all 7
> subtasks of the AsyncIO. The number of threads therefore is not 140 (20 *
> 7) but just (16 + 8 or 16 = 24 or 32) which includes a static shared thread
> pool for the listener of the response Future to deserialize and process the
> response body before it is emitted out.
>
> *Single operator:* With parallelism level 1 we experimented with
> following things, but there was very high backpressure on the upstream
> operator and overall pipeline throughput was unacceptably low
>
> (a) async buffer capacity value - beyond 100 there was a drop in throughput
> (b) io thread-pool in async http client - reducing the default size
> reduced the throughput as well
>
> (c) thread pool size for the listener of response Future
>
> From a scaling out perspective we definitely need to scale this out to be
> able to support about three million records per second and as per the
> experiments and benchmarks done till now, it appears that we will need many
> higher-core machines in a larger cluster. The intention of posting this
> question here is to validate and find if anything similar has been done by
> someone on lower-core machines with success.
>
> Thanks & regards,
> Arti
>
>
> On Fri, Jun 12, 2020 at 7:00 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Arti,
>>
>> Thank you very much for providing so much information.
>>
>> One additional test that you could do is to check how the pipeline
>> performs by mocking the actual HTTP request and directly return a static
>> response through Async IO. This would give you an exact number including
>> potential serialization costs. I often see users focusing on the
>> performance of a specific operator, while the bottleneck is the shuffle
>> step before or after that (you get a shuffle if you have keyby or change
>> the degree of parallelism).
>>
>> But I'm going forward by the assumption that this is indeed completely
>> the AsyncIO itself that is causing the loss of performance. So for me, it
>> looks like Flink is a factor of 2 to 3 slower than the native approach
>> (exact number is hard to give, as there are more steps involved in Flink).
>> My guess is that the sheer number of threads might cause too many context
>> switches and the drop of performance. Are you starting an AsyncHTTPClient
>> thread pool per subtask or do you have it shared (by using a static
>> variable initialized once in RichAsyncFunction#open)? It sounded like the
>> former, so I'd definitely recommend trying the latter approach.
>>
>> One additional thing that you could try is actually use a parallelism of
>> 1 on that AsyncIO and just tweak that one thread pool according to your
>> non-Flink test. But that's usually harder to scale out.
>>
>>
>> To your final question, if it's the right fit: In general, the strength
>> of most distributed stream processors is distributed computing. In your
>> case, it will probably always happen that you will hit the limit on one
>> machine sooner or later. Then, it's the hour of Flink to shine and actually
>> distribute the work among multiple workers.
>>
>> If you never plan to scale out, there are probably other frameworks that
>> are more suited (Akka HTTP would be a natural candidate, assuming you can
>> connect your source/sink directly).
>>
>> However, I'd probably rather encourage you to consider scaling out as a
>> natural component in your architecture. Data volume doubles roughly every
>> 18 months, so unless you buy some very beefy machine, you will hit the
>> limit sooner or later. From your description it sounds to me as if you kind
>> of envision a throughput of at least 100K rec/s. Given that even in your
>> stand-alone test with no additional transformations, you need 3-5 of your 8
>> core machines to just perform the HTTP requests. So most likely you need a
>> machine with more than 32 cores and that's the point where they get quickly
>> expensive without offering you any fault tolerance. On the other hand, if
>> you have a cluster of many smaller machines, you get a much more reliable
>> environment that is overall cheaper.
>>
>> We unfortunately still need more time to fully incorporate dynamic
>> scaling-in and out (ETA Flink 1.12 with Flink 1.11 currently being
>> finalized), then you would be able to react on slower traffic (during
>> night?) and peaks (noon, start/end of months) and get a very cost-efficient
>> system.
>>
>> On Fri, Jun 12, 2020 at 10:59 AM Arti Pande <pa...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> Thanks for quick reply and totally agree with you on the differences
>>> between microbenchmarks and a full benchmark with specific use-case. Thanks
>>> for sending the microbenchmark screenshot.
>>>
>>> For our use-case, the streaming pipeline has five main transformations
>>> that have business logic, of which Async IO to external API endpoint is one
>>> operator. To create benchmarks for operators, I run the real pipeline with
>>> full load on a single machine and note the Throughput and latency. Then add
>>> each operator one by one; always keeping the other basic operators like
>>> source, watermark generator, deserializer, sink etc turned on. The
>>> intention is to build a sort of incremental realistic benchmark for each
>>> operator for given use-case. Adding the AsyncIO operator (with parallelism
>>> 7 and async buffer capacity 100) with AsyncHTTPClient library brings
>>> throughput down from 102 K to a very low number i.e. 10K or 12 K
>>> records/sec.
>>>
>>> As you suggested, we tested the library being used (AsyncHTTPClient)
>>> independently of Flink, in a similar way to what AsyncIO does. A
>>> simple java program that invokes millions of API calls in a loop, with
>>> hard-coded POST request values, and limited (configurable) number of
>>> concurrent (maxInFlight) requests.  AsyncHTTPClient library by default uses
>>> nCores * 2 (= 16) IO threads, plus a fixed set of threads (say 2 or 4) for
>>> the ExecutorService to be passed to the Listener of result Future. So with
>>> this library the code requires at least 18 or 20 threads. Varying the
>>> maxInFlightRequests from 100 to 3000 the throughput varied from 17 K to 34
>>> K records/sec. Ofcourse this was with hard-coded POST request values and
>>> discarding the response body on reading (no further processing on it).
>>>
>>> When we tried to vary the async buffer capacity of AsyncIO (equivalent
>>> of maxInFlightRequests above) beyond 100, our throughput dropped further by
>>> 20% to 30%. Whereas in the test program above we would get better
>>> performance as we increased maxInFlightRequests from 100 to 3000.
>>>
>>> To reduce backpressure on upstream operators we had to increase the
>>> AsyncIO operator parallelism upto 7. But that means at least 20*7 = 140
>>> threads per single pipeline plus the threads of other operators in the
>>> pipeline.
>>>
>>> The question therefore is, given the pipeline is highly multithreaded
>>> can 8-core machines suit this? Also is Flink the right framework for such
>>> multi-threaded streaming pipelines that have external API calls and
>>> high-speed ingestion?
>>>
>>> Thanks & regards
>>> Arti
>>>
>>>
>>> On Thu, Jun 11, 2020 at 1:13 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Arti,
>>>>
>>>> you are now comparing a microbenchmark to a full benchmark. The job
>>>> contains multiple transformations (each dividing the power of the core) and
>>>> most importantly, it contains network traffic (shuffle) to perform a join.
>>>> Unless you do heavy transformation, your throughput will always be
>>>> bottlenecked with network traffic. At the very least, that contains
>>>> additional serialization costs. And it contains heavy states through the
>>>> window aggregation, which also results in serialization and I/O.
>>>>
>>>> I attached the screenshot from the microbenchmark. In general,
>>>> microbenchmarks are not really useful for end-users, but you explicitly
>>>> asked for them. The bottleneck usually arises from the IO part, which is
>>>> NOT benchmarked. I'm assuming you are actually more interested in whether
>>>> and how your use case can be solved in Flink rather than technical details.
>>>> First of all, it always helps to have more information about the intended
>>>> query than going directly into technical details. I gathered that you have
>>>> a third-part microservice that you need to query and you want to do
>>>> additional transformations. It would also be interesting how you performed
>>>> your benchmarks and measured the performance drop.
>>>>
>>>> First of all, even though I discouraged the use of microservices in a
>>>> stream processor, it doesn't mean that it's not possible. You just lose
>>>> some of the nice properties that are possible. 1) Your latency will
>>>> typically be much higher as if it was a data source. 2) You lose
>>>> exactly-once behavior in regard to the HTTP endpoint. On recovery, requests
>>>> since the last checkpoint will be repeated. You need to check if that makes
>>>> sense if the microservices has side-effects that don't allow that. But the
>>>> same is true for most other stream processors and can only be avoided by
>>>> using per-record commits (and then even then, this in-progress record may
>>>> result in duplicate HTTP queries on recovery). 3) If that external endpoint
>>>> is down, there is no way to do meaningful processing. So you add the
>>>> downtime of your Flink cluster and your external microservice. That becomes
>>>> especially important if you have a wide-range of microservices.
>>>>
>>>> So as I wrote in last mail, I'd first establish a boundary independent
>>>> of Flink, by running some Java program with your used async library and
>>>> tune the settings to reach saturation on one machine. That boundary becomes
>>>> your gold standard - there is no way Flink or any other stream processor
>>>> can do it better. If you publish this number, we can jointly find good
>>>> async IO settings. You can of course also see if other libraries are more
>>>> suitable for your needs, ideally implementations that use fewer threads and
>>>> more light-weight constructs to achieve the same construct. Then I'd also
>>>> measure a histogram of response times. Do you have stragglers or are
>>>> response times rather uniform? For uniformity, configuration of async I/O
>>>> is usually quite straight-forward, just use a queue length that fully
>>>> satures your async library (so maxConnections+X, where X is a small
>>>> constant safety buffer). It also helps to determine if you need to scale
>>>> out or not.
>>>>
>>>>
>>>> On Thu, Jun 11, 2020 at 9:03 AM Arti Pande <pa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Arvid,
>>>>>
>>>>> Thanks for a quick reply.
>>>>>
>>>>> The second reference link (
>>>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2)
>>>>> from your answer is not accessible though. Could you share some more
>>>>> numbers from it? Are these benchmarks published somewhere?
>>>>>
>>>>> Without actual IO call, Async IO operator benchmark of 1.6 K
>>>>> records/ms per core translates to *1.6 million records/sec per core*.
>>>>> So an 8 core machine should give roughly *12.8 million records/sec* ?
>>>>> Is this the correct number? How do we compare it with this benchmark
>>>>> <https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
>>>>> article that talks about total throughput of 4 million records/sec (without
>>>>> Async IO operator) in a cluster of about 10 machines with 16-core each?
>>>>>
>>>>> Ordered wait is indispensable for our use-case because we need to call
>>>>> the external (partner organisation) system's API endpoint for each incoming
>>>>> record. Depending on the response from that API we need to decide how to
>>>>> process this record and order needs to be preserved. This may not have been
>>>>> a problem if data ingestion rates were low. Real challenge is because of
>>>>> the high-speed stream (millions of events per second) of input.
>>>>>
>>>>> Is higher core machines an answer or is Flink not suitable for
>>>>> use-cases like this?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Arti,
>>>>>>
>>>>>> microbenchmarks for AsyncIO are available [1] and the results shown
>>>>>> in [2]. So you can roughly expect 1.6k records/ms per core to be the upper
>>>>>> limit without any actual I/O. That range should hold for Flink 1.10 and
>>>>>> coming Flink 1.11. I cannot say much about older versions and you didn't
>>>>>> specify which you use. But it shouldn't be an order of magnitude different.
>>>>>>
>>>>>> The biggest performance improvement will probably be to switch to
>>>>>> unordered - results are emitted as soon as they arrive. On ordered, the
>>>>>> first element that came in, needs to be finished before any emission. If
>>>>>> some elements take longer than others, these slow elements quickly become a
>>>>>> bottleneck.
>>>>>>
>>>>>> If async I/O needs to be ordered, then you need to tweak what you
>>>>>> already mentioned. Set DOP to the number of physical cores, there is no
>>>>>> benefit in going higher. If you indeed use an async HTTP client, then the
>>>>>> queue size should be a bit higher than the thread pool size. The thread
>>>>>> pool size will effectively limit the parallelism per subtask and you want
>>>>>> to saturate that from the Flink side. The thread pool size (together with
>>>>>> maxConnections) will put the hard limit together with the request
>>>>>> processing time on your application.
>>>>>>
>>>>>> I'd probably consider using more machines in your stead instead of
>>>>>> more cores per machine (sounded like that is an option). So instead of
>>>>>> using 10x12 cores, use 15x8 cores. You could measure how much max
>>>>>> throughput to expect by using one machine and use a benchmarking tool that
>>>>>> increases the requests per second on that machine until it hits the limit.
>>>>>> Then you know how many machines you need at the very least.
>>>>>>
>>>>>> Finally, it might also be a good time to review your architecture.
>>>>>> Microservices are not the best fit for a streaming application. For
>>>>>> example, if this is a lookup service, it would scale and fit much better if
>>>>>> all data could be ingested by Flink as an additional data source (e.g.
>>>>>> Kafka topic). Existing microservices might be converted into such data
>>>>>> sources with change-data-capture.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
>>>>>> [2]
>>>>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>>>>>>
>>>>>> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> As Flink Async IO operator is designed for external API or DB
>>>>>>> calls, are there any specific guidelines / tips for scaling up this
>>>>>>> operator? Particularly for use-cases where incoming events are being
>>>>>>> ingested at a very high-speed and the Async IO operator with
>>>>>>> orderedWait mode can not keep up with that speed (although the
>>>>>>> target API endpoint it is calling is load tested to provide much higher
>>>>>>> throughput with very minimal latency). In our case adding Async IO operator
>>>>>>> to the pipeline *reduced the throughput by 88% to 90%*. This is
>>>>>>> huge performance hit!
>>>>>>>
>>>>>>> We tried a couple of things:
>>>>>>>
>>>>>>>    1. Increasing the async buffer capacity parameter, there by
>>>>>>>    increasing the number of concurrent requests at any given point in time
>>>>>>>    that are waiting for response. This proved counter-productive beyond a very
>>>>>>>    small number like 50 or 100.
>>>>>>>    2. Increasing the operator parallelism. This does not help much
>>>>>>>    as the number of cores on our machines are limited (8 or 12)
>>>>>>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>>>>>>    maxConnections, maxConnectionsPerHost) and the size of
>>>>>>>    FixedThreadPool used by the Listener of its Future. Again without much
>>>>>>>    improvement.
>>>>>>>
>>>>>>> Our observation is that although Async IO operator works for one
>>>>>>> stream element at a time, the operator and its underlying HTTP client are
>>>>>>> multithreaded and need higher core machines for high-speed stream
>>>>>>> processing. If the only machines available for this kind of applications
>>>>>>> are 8 to 16 cores, we face challenges in meeting the required throughput
>>>>>>> and latency SLAs.
>>>>>>>
>>>>>>> Are there any micro-benchmarks or tuning guidelines for using Async
>>>>>>> IO for high-speed stream processing, so we know how much throughput
>>>>>>> to expect from it?
>>>>>>> Thanks & regards,
>>>>>>> Arti
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Arvid Heise | Senior Java Developer
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>
>>>>>> --
>>>>>> Ververica GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>> Ji (Toni) Cheng
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arti Pande <pa...@gmail.com>.
Hi Arvid,

*Shared api client*: Actually in the flow of writing I missed to mention
that we switched to a static shared instance of async http client for all 7
subtasks of the AsyncIO. The number of threads therefore is not 140 (20 *
7) but just (16 + 8 or 16 = 24 or 32) which includes a static shared thread
pool for the listener of the response Future to deserialize and process the
response body before it is emitted out.

*Single operator:* With parallelism level 1 we experimented with following
things, but there was very high backpressure on the upstream operator and
overall pipeline throughput was unacceptably low

(a) async buffer capacity value - beyond 100 there was a drop in throughput
(b) io thread-pool in async http client - reducing the default size reduced
the throughput as well

(c) thread pool size for the listener of response Future

From a scaling out perspective we definitely need to scale this out to be
able to support about three million records per second and as per the
experiments and benchmarks done till now, it appears that we will need many
higher-core machines in a larger cluster. The intention of posting this
question here is to validate and find if anything similar has been done by
someone on lower-core machines with success.

Thanks & regards,
Arti


On Fri, Jun 12, 2020 at 7:00 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Arti,
>
> Thank you very much for providing so much information.
>
> One additional test that you could do is to check how the pipeline
> performs by mocking the actual HTTP request and directly return a static
> response through Async IO. This would give you an exact number including
> potential serialization costs. I often see users focusing on the
> performance of a specific operator, while the bottleneck is the shuffle
> step before or after that (you get a shuffle if you have keyby or change
> the degree of parallelism).
>
> But I'm going forward by the assumption that this is indeed completely the
> AsyncIO itself that is causing the loss of performance. So for me, it looks
> like Flink is a factor of 2 to 3 slower than the native approach (exact
> number is hard to give, as there are more steps involved in Flink). My
> guess is that the sheer number of threads might cause too many context
> switches and the drop of performance. Are you starting an AsyncHTTPClient
> thread pool per subtask or do you have it shared (by using a static
> variable initialized once in RichAsyncFunction#open)? It sounded like the
> former, so I'd definitely recommend trying the latter approach.
>
> One additional thing that you could try is actually use a parallelism of 1
> on that AsyncIO and just tweak that one thread pool according to your
> non-Flink test. But that's usually harder to scale out.
>
>
> To your final question, if it's the right fit: In general, the strength of
> most distributed stream processors is distributed computing. In your case,
> it will probably always happen that you will hit the limit on one machine
> sooner or later. Then, it's the hour of Flink to shine and actually
> distribute the work among multiple workers.
>
> If you never plan to scale out, there are probably other frameworks that
> are more suited (Akka HTTP would be a natural candidate, assuming you can
> connect your source/sink directly).
>
> However, I'd probably rather encourage you to consider scaling out as a
> natural component in your architecture. Data volume doubles roughly every
> 18 months, so unless you buy some very beefy machine, you will hit the
> limit sooner or later. From your description it sounds to me as if you kind
> of envision a throughput of at least 100K rec/s. Given that even in your
> stand-alone test with no additional transformations, you need 3-5 of your 8
> core machines to just perform the HTTP requests. So most likely you need a
> machine with more than 32 cores and that's the point where they get quickly
> expensive without offering you any fault tolerance. On the other hand, if
> you have a cluster of many smaller machines, you get a much more reliable
> environment that is overall cheaper.
>
> We unfortunately still need more time to fully incorporate dynamic
> scaling-in and out (ETA Flink 1.12 with Flink 1.11 currently being
> finalized), then you would be able to react on slower traffic (during
> night?) and peaks (noon, start/end of months) and get a very cost-efficient
> system.
>
> On Fri, Jun 12, 2020 at 10:59 AM Arti Pande <pa...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks for quick reply and totally agree with you on the differences
>> between microbenchmarks and a full benchmark with specific use-case. Thanks
>> for sending the microbenchmark screenshot.
>>
>> For our use-case, the streaming pipeline has five main transformations
>> that have business logic, of which Async IO to external API endpoint is one
>> operator. To create benchmarks for operators, I run the real pipeline with
>> full load on a single machine and note the Throughput and latency. Then add
>> each operator one by one; always keeping the other basic operators like
>> source, watermark generator, deserializer, sink etc turned on. The
>> intention is to build a sort of incremental realistic benchmark for each
>> operator for given use-case. Adding the AsyncIO operator (with parallelism
>> 7 and async buffer capacity 100) with AsyncHTTPClient library brings
>> throughput down from 102 K to a very low number i.e. 10K or 12 K
>> records/sec.
>>
>> As you suggested, we tested the library being used (AsyncHTTPClient)
>> independently of Flink, in a similar way to what AsyncIO does. A
>> simple java program that invokes millions of API calls in a loop, with
>> hard-coded POST request values, and limited (configurable) number of
>> concurrent (maxInFlight) requests.  AsyncHTTPClient library by default uses
>> nCores * 2 (= 16) IO threads, plus a fixed set of threads (say 2 or 4) for
>> the ExecutorService to be passed to the Listener of result Future. So with
>> this library the code requires at least 18 or 20 threads. Varying the
>> maxInFlightRequests from 100 to 3000 the throughput varied from 17 K to 34
>> K records/sec. Ofcourse this was with hard-coded POST request values and
>> discarding the response body on reading (no further processing on it).
>>
>> When we tried to vary the async buffer capacity of AsyncIO (equivalent of
>> maxInFlightRequests above) beyond 100, our throughput dropped further by
>> 20% to 30%. Whereas in the test program above we would get better
>> performance as we increased maxInFlightRequests from 100 to 3000.
>>
>> To reduce backpressure on upstream operators we had to increase the
>> AsyncIO operator parallelism upto 7. But that means at least 20*7 = 140
>> threads per single pipeline plus the threads of other operators in the
>> pipeline.
>>
>> The question therefore is, given the pipeline is highly multithreaded can
>> 8-core machines suit this? Also is Flink the right framework for such
>> multi-threaded streaming pipelines that have external API calls and
>> high-speed ingestion?
>>
>> Thanks & regards
>> Arti
>>
>>
>> On Thu, Jun 11, 2020 at 1:13 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Arti,
>>>
>>> you are now comparing a microbenchmark to a full benchmark. The job
>>> contains multiple transformations (each dividing the power of the core) and
>>> most importantly, it contains network traffic (shuffle) to perform a join.
>>> Unless you do heavy transformation, your throughput will always be
>>> bottlenecked with network traffic. At the very least, that contains
>>> additional serialization costs. And it contains heavy states through the
>>> window aggregation, which also results in serialization and I/O.
>>>
>>> I attached the screenshot from the microbenchmark. In general,
>>> microbenchmarks are not really useful for end-users, but you explicitly
>>> asked for them. The bottleneck usually arises from the IO part, which is
>>> NOT benchmarked. I'm assuming you are actually more interested in whether
>>> and how your use case can be solved in Flink rather than technical details.
>>> First of all, it always helps to have more information about the intended
>>> query than going directly into technical details. I gathered that you have
>>> a third-part microservice that you need to query and you want to do
>>> additional transformations. It would also be interesting how you performed
>>> your benchmarks and measured the performance drop.
>>>
>>> First of all, even though I discouraged the use of microservices in a
>>> stream processor, it doesn't mean that it's not possible. You just lose
>>> some of the nice properties that are possible. 1) Your latency will
>>> typically be much higher as if it was a data source. 2) You lose
>>> exactly-once behavior in regard to the HTTP endpoint. On recovery, requests
>>> since the last checkpoint will be repeated. You need to check if that makes
>>> sense if the microservices has side-effects that don't allow that. But the
>>> same is true for most other stream processors and can only be avoided by
>>> using per-record commits (and then even then, this in-progress record may
>>> result in duplicate HTTP queries on recovery). 3) If that external endpoint
>>> is down, there is no way to do meaningful processing. So you add the
>>> downtime of your Flink cluster and your external microservice. That becomes
>>> especially important if you have a wide-range of microservices.
>>>
>>> So as I wrote in last mail, I'd first establish a boundary independent
>>> of Flink, by running some Java program with your used async library and
>>> tune the settings to reach saturation on one machine. That boundary becomes
>>> your gold standard - there is no way Flink or any other stream processor
>>> can do it better. If you publish this number, we can jointly find good
>>> async IO settings. You can of course also see if other libraries are more
>>> suitable for your needs, ideally implementations that use fewer threads and
>>> more light-weight constructs to achieve the same construct. Then I'd also
>>> measure a histogram of response times. Do you have stragglers or are
>>> response times rather uniform? For uniformity, configuration of async I/O
>>> is usually quite straight-forward, just use a queue length that fully
>>> satures your async library (so maxConnections+X, where X is a small
>>> constant safety buffer). It also helps to determine if you need to scale
>>> out or not.
>>>
>>>
>>> On Thu, Jun 11, 2020 at 9:03 AM Arti Pande <pa...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> Thanks for a quick reply.
>>>>
>>>> The second reference link (
>>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2)
>>>> from your answer is not accessible though. Could you share some more
>>>> numbers from it? Are these benchmarks published somewhere?
>>>>
>>>> Without actual IO call, Async IO operator benchmark of 1.6 K records/ms
>>>> per core translates to *1.6 million records/sec per core*. So an 8
>>>> core machine should give roughly *12.8 million records/sec* ? Is this
>>>> the correct number? How do we compare it with this benchmark
>>>> <https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
>>>> article that talks about total throughput of 4 million records/sec (without
>>>> Async IO operator) in a cluster of about 10 machines with 16-core each?
>>>>
>>>> Ordered wait is indispensable for our use-case because we need to call
>>>> the external (partner organisation) system's API endpoint for each incoming
>>>> record. Depending on the response from that API we need to decide how to
>>>> process this record and order needs to be preserved. This may not have been
>>>> a problem if data ingestion rates were low. Real challenge is because of
>>>> the high-speed stream (millions of events per second) of input.
>>>>
>>>> Is higher core machines an answer or is Flink not suitable for
>>>> use-cases like this?
>>>>
>>>>
>>>>
>>>> On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Arti,
>>>>>
>>>>> microbenchmarks for AsyncIO are available [1] and the results shown in
>>>>> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
>>>>> limit without any actual I/O. That range should hold for Flink 1.10 and
>>>>> coming Flink 1.11. I cannot say much about older versions and you didn't
>>>>> specify which you use. But it shouldn't be an order of magnitude different.
>>>>>
>>>>> The biggest performance improvement will probably be to switch to
>>>>> unordered - results are emitted as soon as they arrive. On ordered, the
>>>>> first element that came in, needs to be finished before any emission. If
>>>>> some elements take longer than others, these slow elements quickly become a
>>>>> bottleneck.
>>>>>
>>>>> If async I/O needs to be ordered, then you need to tweak what you
>>>>> already mentioned. Set DOP to the number of physical cores, there is no
>>>>> benefit in going higher. If you indeed use an async HTTP client, then the
>>>>> queue size should be a bit higher than the thread pool size. The thread
>>>>> pool size will effectively limit the parallelism per subtask and you want
>>>>> to saturate that from the Flink side. The thread pool size (together with
>>>>> maxConnections) will put the hard limit together with the request
>>>>> processing time on your application.
>>>>>
>>>>> I'd probably consider using more machines in your stead instead of
>>>>> more cores per machine (sounded like that is an option). So instead of
>>>>> using 10x12 cores, use 15x8 cores. You could measure how much max
>>>>> throughput to expect by using one machine and use a benchmarking tool that
>>>>> increases the requests per second on that machine until it hits the limit.
>>>>> Then you know how many machines you need at the very least.
>>>>>
>>>>> Finally, it might also be a good time to review your architecture.
>>>>> Microservices are not the best fit for a streaming application. For
>>>>> example, if this is a lookup service, it would scale and fit much better if
>>>>> all data could be ingested by Flink as an additional data source (e.g.
>>>>> Kafka topic). Existing microservices might be converted into such data
>>>>> sources with change-data-capture.
>>>>>
>>>>> [1]
>>>>> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
>>>>> [2]
>>>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>>>>>
>>>>> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> As Flink Async IO operator is designed for external API or DB calls,
>>>>>> are there any specific guidelines / tips for scaling up this operator?
>>>>>> Particularly for use-cases where incoming events are being ingested at a
>>>>>> very high-speed and the Async IO operator with orderedWait mode can
>>>>>> not keep up with that speed (although the target API endpoint it is calling
>>>>>> is load tested to provide much higher throughput with very minimal
>>>>>> latency). In our case adding Async IO operator to the pipeline *reduced
>>>>>> the throughput by 88% to 90%*. This is huge performance hit!
>>>>>>
>>>>>> We tried a couple of things:
>>>>>>
>>>>>>    1. Increasing the async buffer capacity parameter, there by
>>>>>>    increasing the number of concurrent requests at any given point in time
>>>>>>    that are waiting for response. This proved counter-productive beyond a very
>>>>>>    small number like 50 or 100.
>>>>>>    2. Increasing the operator parallelism. This does not help much
>>>>>>    as the number of cores on our machines are limited (8 or 12)
>>>>>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>>>>>    maxConnections, maxConnectionsPerHost) and the size of
>>>>>>    FixedThreadPool used by the Listener of its Future. Again without much
>>>>>>    improvement.
>>>>>>
>>>>>> Our observation is that although Async IO operator works for one
>>>>>> stream element at a time, the operator and its underlying HTTP client are
>>>>>> multithreaded and need higher core machines for high-speed stream
>>>>>> processing. If the only machines available for this kind of applications
>>>>>> are 8 to 16 cores, we face challenges in meeting the required throughput
>>>>>> and latency SLAs.
>>>>>>
>>>>>> Are there any micro-benchmarks or tuning guidelines for using Async
>>>>>> IO for high-speed stream processing, so we know how much throughput
>>>>>> to expect from it?
>>>>>> Thanks & regards,
>>>>>> Arti
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arvid Heise <ar...@ververica.com>.
Hi Arti,

Thank you very much for providing so much information.

One additional test that you could do is to check how the pipeline performs
by mocking the actual HTTP request and directly return a static response
through Async IO. This would give you an exact number including potential
serialization costs. I often see users focusing on the performance of a
specific operator, while the bottleneck is the shuffle step before or after
that (you get a shuffle if you have keyby or change the degree of
parallelism).

But I'm going forward by the assumption that this is indeed completely the
AsyncIO itself that is causing the loss of performance. So for me, it looks
like Flink is a factor of 2 to 3 slower than the native approach (exact
number is hard to give, as there are more steps involved in Flink). My
guess is that the sheer number of threads might cause too many context
switches and the drop of performance. Are you starting an AsyncHTTPClient
thread pool per subtask or do you have it shared (by using a static
variable initialized once in RichAsyncFunction#open)? It sounded like the
former, so I'd definitely recommend trying the latter approach.

One additional thing that you could try is actually use a parallelism of 1
on that AsyncIO and just tweak that one thread pool according to your
non-Flink test. But that's usually harder to scale out.


To your final question, if it's the right fit: In general, the strength of
most distributed stream processors is distributed computing. In your case,
it will probably always happen that you will hit the limit on one machine
sooner or later. Then, it's the hour of Flink to shine and actually
distribute the work among multiple workers.

If you never plan to scale out, there are probably other frameworks that
are more suited (Akka HTTP would be a natural candidate, assuming you can
connect your source/sink directly).

However, I'd probably rather encourage you to consider scaling out as a
natural component in your architecture. Data volume doubles roughly every
18 months, so unless you buy some very beefy machine, you will hit the
limit sooner or later. From your description it sounds to me as if you kind
of envision a throughput of at least 100K rec/s. Given that even in your
stand-alone test with no additional transformations, you need 3-5 of your 8
core machines to just perform the HTTP requests. So most likely you need a
machine with more than 32 cores and that's the point where they get quickly
expensive without offering you any fault tolerance. On the other hand, if
you have a cluster of many smaller machines, you get a much more reliable
environment that is overall cheaper.

We unfortunately still need more time to fully incorporate dynamic
scaling-in and out (ETA Flink 1.12 with Flink 1.11 currently being
finalized), then you would be able to react on slower traffic (during
night?) and peaks (noon, start/end of months) and get a very cost-efficient
system.

On Fri, Jun 12, 2020 at 10:59 AM Arti Pande <pa...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for quick reply and totally agree with you on the differences
> between microbenchmarks and a full benchmark with specific use-case. Thanks
> for sending the microbenchmark screenshot.
>
> For our use-case, the streaming pipeline has five main transformations
> that have business logic, of which Async IO to external API endpoint is one
> operator. To create benchmarks for operators, I run the real pipeline with
> full load on a single machine and note the Throughput and latency. Then add
> each operator one by one; always keeping the other basic operators like
> source, watermark generator, deserializer, sink etc turned on. The
> intention is to build a sort of incremental realistic benchmark for each
> operator for given use-case. Adding the AsyncIO operator (with parallelism
> 7 and async buffer capacity 100) with AsyncHTTPClient library brings
> throughput down from 102 K to a very low number i.e. 10K or 12 K
> records/sec.
>
> As you suggested, we tested the library being used (AsyncHTTPClient)
> independently of Flink, in a similar way to what AsyncIO does. A
> simple java program that invokes millions of API calls in a loop, with
> hard-coded POST request values, and limited (configurable) number of
> concurrent (maxInFlight) requests.  AsyncHTTPClient library by default uses
> nCores * 2 (= 16) IO threads, plus a fixed set of threads (say 2 or 4) for
> the ExecutorService to be passed to the Listener of result Future. So with
> this library the code requires at least 18 or 20 threads. Varying the
> maxInFlightRequests from 100 to 3000 the throughput varied from 17 K to 34
> K records/sec. Ofcourse this was with hard-coded POST request values and
> discarding the response body on reading (no further processing on it).
>
> When we tried to vary the async buffer capacity of AsyncIO (equivalent of
> maxInFlightRequests above) beyond 100, our throughput dropped further by
> 20% to 30%. Whereas in the test program above we would get better
> performance as we increased maxInFlightRequests from 100 to 3000.
>
> To reduce backpressure on upstream operators we had to increase the
> AsyncIO operator parallelism upto 7. But that means at least 20*7 = 140
> threads per single pipeline plus the threads of other operators in the
> pipeline.
>
> The question therefore is, given the pipeline is highly multithreaded can
> 8-core machines suit this? Also is Flink the right framework for such
> multi-threaded streaming pipelines that have external API calls and
> high-speed ingestion?
>
> Thanks & regards
> Arti
>
>
> On Thu, Jun 11, 2020 at 1:13 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Arti,
>>
>> you are now comparing a microbenchmark to a full benchmark. The job
>> contains multiple transformations (each dividing the power of the core) and
>> most importantly, it contains network traffic (shuffle) to perform a join.
>> Unless you do heavy transformation, your throughput will always be
>> bottlenecked with network traffic. At the very least, that contains
>> additional serialization costs. And it contains heavy states through the
>> window aggregation, which also results in serialization and I/O.
>>
>> I attached the screenshot from the microbenchmark. In general,
>> microbenchmarks are not really useful for end-users, but you explicitly
>> asked for them. The bottleneck usually arises from the IO part, which is
>> NOT benchmarked. I'm assuming you are actually more interested in whether
>> and how your use case can be solved in Flink rather than technical details.
>> First of all, it always helps to have more information about the intended
>> query than going directly into technical details. I gathered that you have
>> a third-part microservice that you need to query and you want to do
>> additional transformations. It would also be interesting how you performed
>> your benchmarks and measured the performance drop.
>>
>> First of all, even though I discouraged the use of microservices in a
>> stream processor, it doesn't mean that it's not possible. You just lose
>> some of the nice properties that are possible. 1) Your latency will
>> typically be much higher as if it was a data source. 2) You lose
>> exactly-once behavior in regard to the HTTP endpoint. On recovery, requests
>> since the last checkpoint will be repeated. You need to check if that makes
>> sense if the microservices has side-effects that don't allow that. But the
>> same is true for most other stream processors and can only be avoided by
>> using per-record commits (and then even then, this in-progress record may
>> result in duplicate HTTP queries on recovery). 3) If that external endpoint
>> is down, there is no way to do meaningful processing. So you add the
>> downtime of your Flink cluster and your external microservice. That becomes
>> especially important if you have a wide-range of microservices.
>>
>> So as I wrote in last mail, I'd first establish a boundary independent of
>> Flink, by running some Java program with your used async library and tune
>> the settings to reach saturation on one machine. That boundary becomes your
>> gold standard - there is no way Flink or any other stream processor can do
>> it better. If you publish this number, we can jointly find good async IO
>> settings. You can of course also see if other libraries are more suitable
>> for your needs, ideally implementations that use fewer threads and more
>> light-weight constructs to achieve the same construct. Then I'd also
>> measure a histogram of response times. Do you have stragglers or are
>> response times rather uniform? For uniformity, configuration of async I/O
>> is usually quite straight-forward, just use a queue length that fully
>> satures your async library (so maxConnections+X, where X is a small
>> constant safety buffer). It also helps to determine if you need to scale
>> out or not.
>>
>>
>> On Thu, Jun 11, 2020 at 9:03 AM Arti Pande <pa...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> Thanks for a quick reply.
>>>
>>> The second reference link (
>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2)
>>> from your answer is not accessible though. Could you share some more
>>> numbers from it? Are these benchmarks published somewhere?
>>>
>>> Without actual IO call, Async IO operator benchmark of 1.6 K records/ms
>>> per core translates to *1.6 million records/sec per core*. So an 8 core
>>> machine should give roughly *12.8 million records/sec* ? Is this the
>>> correct number? How do we compare it with this benchmark
>>> <https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
>>> article that talks about total throughput of 4 million records/sec (without
>>> Async IO operator) in a cluster of about 10 machines with 16-core each?
>>>
>>> Ordered wait is indispensable for our use-case because we need to call
>>> the external (partner organisation) system's API endpoint for each incoming
>>> record. Depending on the response from that API we need to decide how to
>>> process this record and order needs to be preserved. This may not have been
>>> a problem if data ingestion rates were low. Real challenge is because of
>>> the high-speed stream (millions of events per second) of input.
>>>
>>> Is higher core machines an answer or is Flink not suitable for use-cases
>>> like this?
>>>
>>>
>>>
>>> On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Arti,
>>>>
>>>> microbenchmarks for AsyncIO are available [1] and the results shown in
>>>> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
>>>> limit without any actual I/O. That range should hold for Flink 1.10 and
>>>> coming Flink 1.11. I cannot say much about older versions and you didn't
>>>> specify which you use. But it shouldn't be an order of magnitude different.
>>>>
>>>> The biggest performance improvement will probably be to switch to
>>>> unordered - results are emitted as soon as they arrive. On ordered, the
>>>> first element that came in, needs to be finished before any emission. If
>>>> some elements take longer than others, these slow elements quickly become a
>>>> bottleneck.
>>>>
>>>> If async I/O needs to be ordered, then you need to tweak what you
>>>> already mentioned. Set DOP to the number of physical cores, there is no
>>>> benefit in going higher. If you indeed use an async HTTP client, then the
>>>> queue size should be a bit higher than the thread pool size. The thread
>>>> pool size will effectively limit the parallelism per subtask and you want
>>>> to saturate that from the Flink side. The thread pool size (together with
>>>> maxConnections) will put the hard limit together with the request
>>>> processing time on your application.
>>>>
>>>> I'd probably consider using more machines in your stead instead of more
>>>> cores per machine (sounded like that is an option). So instead of using
>>>> 10x12 cores, use 15x8 cores. You could measure how much max throughput to
>>>> expect by using one machine and use a benchmarking tool that increases the
>>>> requests per second on that machine until it hits the limit. Then you know
>>>> how many machines you need at the very least.
>>>>
>>>> Finally, it might also be a good time to review your architecture.
>>>> Microservices are not the best fit for a streaming application. For
>>>> example, if this is a lookup service, it would scale and fit much better if
>>>> all data could be ingested by Flink as an additional data source (e.g.
>>>> Kafka topic). Existing microservices might be converted into such data
>>>> sources with change-data-capture.
>>>>
>>>> [1]
>>>> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
>>>> [2]
>>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>>>>
>>>> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com>
>>>> wrote:
>>>>
>>>>> As Flink Async IO operator is designed for external API or DB calls,
>>>>> are there any specific guidelines / tips for scaling up this operator?
>>>>> Particularly for use-cases where incoming events are being ingested at a
>>>>> very high-speed and the Async IO operator with orderedWait mode can
>>>>> not keep up with that speed (although the target API endpoint it is calling
>>>>> is load tested to provide much higher throughput with very minimal
>>>>> latency). In our case adding Async IO operator to the pipeline *reduced
>>>>> the throughput by 88% to 90%*. This is huge performance hit!
>>>>>
>>>>> We tried a couple of things:
>>>>>
>>>>>    1. Increasing the async buffer capacity parameter, there by
>>>>>    increasing the number of concurrent requests at any given point in time
>>>>>    that are waiting for response. This proved counter-productive beyond a very
>>>>>    small number like 50 or 100.
>>>>>    2. Increasing the operator parallelism. This does not help much as
>>>>>    the number of cores on our machines are limited (8 or 12)
>>>>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>>>>    maxConnections, maxConnectionsPerHost) and the size of
>>>>>    FixedThreadPool used by the Listener of its Future. Again without much
>>>>>    improvement.
>>>>>
>>>>> Our observation is that although Async IO operator works for one
>>>>> stream element at a time, the operator and its underlying HTTP client are
>>>>> multithreaded and need higher core machines for high-speed stream
>>>>> processing. If the only machines available for this kind of applications
>>>>> are 8 to 16 cores, we face challenges in meeting the required throughput
>>>>> and latency SLAs.
>>>>>
>>>>> Are there any micro-benchmarks or tuning guidelines for using Async IO for
>>>>> high-speed stream processing, so we know how much throughput to expect from
>>>>> it?
>>>>> Thanks & regards,
>>>>> Arti
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arti Pande <pa...@gmail.com>.
Hi Arvid,

Thanks for quick reply and totally agree with you on the differences
between microbenchmarks and a full benchmark with specific use-case. Thanks
for sending the microbenchmark screenshot.

For our use-case, the streaming pipeline has five main transformations that
have business logic, of which Async IO to external API endpoint is one
operator. To create benchmarks for operators, I run the real pipeline with
full load on a single machine and note the Throughput and latency. Then add
each operator one by one; always keeping the other basic operators like
source, watermark generator, deserializer, sink etc turned on. The
intention is to build a sort of incremental realistic benchmark for each
operator for given use-case. Adding the AsyncIO operator (with parallelism
7 and async buffer capacity 100) with AsyncHTTPClient library brings
throughput down from 102 K to a very low number i.e. 10K or 12 K
records/sec.

As you suggested, we tested the library being used (AsyncHTTPClient)
independently of Flink, in a similar way to what AsyncIO does. A
simple java program that invokes millions of API calls in a loop, with
hard-coded POST request values, and limited (configurable) number of
concurrent (maxInFlight) requests.  AsyncHTTPClient library by default uses
nCores * 2 (= 16) IO threads, plus a fixed set of threads (say 2 or 4) for
the ExecutorService to be passed to the Listener of result Future. So with
this library the code requires at least 18 or 20 threads. Varying the
maxInFlightRequests from 100 to 3000 the throughput varied from 17 K to 34
K records/sec. Ofcourse this was with hard-coded POST request values and
discarding the response body on reading (no further processing on it).

When we tried to vary the async buffer capacity of AsyncIO (equivalent of
maxInFlightRequests above) beyond 100, our throughput dropped further by
20% to 30%. Whereas in the test program above we would get better
performance as we increased maxInFlightRequests from 100 to 3000.

To reduce backpressure on upstream operators we had to increase the AsyncIO
operator parallelism upto 7. But that means at least 20*7 = 140 threads per
single pipeline plus the threads of other operators in the pipeline.

The question therefore is, given the pipeline is highly multithreaded can
8-core machines suit this? Also is Flink the right framework for such
multi-threaded streaming pipelines that have external API calls and
high-speed ingestion?

Thanks & regards
Arti


On Thu, Jun 11, 2020 at 1:13 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Arti,
>
> you are now comparing a microbenchmark to a full benchmark. The job
> contains multiple transformations (each dividing the power of the core) and
> most importantly, it contains network traffic (shuffle) to perform a join.
> Unless you do heavy transformation, your throughput will always be
> bottlenecked with network traffic. At the very least, that contains
> additional serialization costs. And it contains heavy states through the
> window aggregation, which also results in serialization and I/O.
>
> I attached the screenshot from the microbenchmark. In general,
> microbenchmarks are not really useful for end-users, but you explicitly
> asked for them. The bottleneck usually arises from the IO part, which is
> NOT benchmarked. I'm assuming you are actually more interested in whether
> and how your use case can be solved in Flink rather than technical details.
> First of all, it always helps to have more information about the intended
> query than going directly into technical details. I gathered that you have
> a third-part microservice that you need to query and you want to do
> additional transformations. It would also be interesting how you performed
> your benchmarks and measured the performance drop.
>
> First of all, even though I discouraged the use of microservices in a
> stream processor, it doesn't mean that it's not possible. You just lose
> some of the nice properties that are possible. 1) Your latency will
> typically be much higher as if it was a data source. 2) You lose
> exactly-once behavior in regard to the HTTP endpoint. On recovery, requests
> since the last checkpoint will be repeated. You need to check if that makes
> sense if the microservices has side-effects that don't allow that. But the
> same is true for most other stream processors and can only be avoided by
> using per-record commits (and then even then, this in-progress record may
> result in duplicate HTTP queries on recovery). 3) If that external endpoint
> is down, there is no way to do meaningful processing. So you add the
> downtime of your Flink cluster and your external microservice. That becomes
> especially important if you have a wide-range of microservices.
>
> So as I wrote in last mail, I'd first establish a boundary independent of
> Flink, by running some Java program with your used async library and tune
> the settings to reach saturation on one machine. That boundary becomes your
> gold standard - there is no way Flink or any other stream processor can do
> it better. If you publish this number, we can jointly find good async IO
> settings. You can of course also see if other libraries are more suitable
> for your needs, ideally implementations that use fewer threads and more
> light-weight constructs to achieve the same construct. Then I'd also
> measure a histogram of response times. Do you have stragglers or are
> response times rather uniform? For uniformity, configuration of async I/O
> is usually quite straight-forward, just use a queue length that fully
> satures your async library (so maxConnections+X, where X is a small
> constant safety buffer). It also helps to determine if you need to scale
> out or not.
>
>
> On Thu, Jun 11, 2020 at 9:03 AM Arti Pande <pa...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks for a quick reply.
>>
>> The second reference link (
>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2)
>> from your answer is not accessible though. Could you share some more
>> numbers from it? Are these benchmarks published somewhere?
>>
>> Without actual IO call, Async IO operator benchmark of 1.6 K records/ms
>> per core translates to *1.6 million records/sec per core*. So an 8 core
>> machine should give roughly *12.8 million records/sec* ? Is this the
>> correct number? How do we compare it with this benchmark
>> <https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
>> article that talks about total throughput of 4 million records/sec (without
>> Async IO operator) in a cluster of about 10 machines with 16-core each?
>>
>> Ordered wait is indispensable for our use-case because we need to call
>> the external (partner organisation) system's API endpoint for each incoming
>> record. Depending on the response from that API we need to decide how to
>> process this record and order needs to be preserved. This may not have been
>> a problem if data ingestion rates were low. Real challenge is because of
>> the high-speed stream (millions of events per second) of input.
>>
>> Is higher core machines an answer or is Flink not suitable for use-cases
>> like this?
>>
>>
>>
>> On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Arti,
>>>
>>> microbenchmarks for AsyncIO are available [1] and the results shown in
>>> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
>>> limit without any actual I/O. That range should hold for Flink 1.10 and
>>> coming Flink 1.11. I cannot say much about older versions and you didn't
>>> specify which you use. But it shouldn't be an order of magnitude different.
>>>
>>> The biggest performance improvement will probably be to switch to
>>> unordered - results are emitted as soon as they arrive. On ordered, the
>>> first element that came in, needs to be finished before any emission. If
>>> some elements take longer than others, these slow elements quickly become a
>>> bottleneck.
>>>
>>> If async I/O needs to be ordered, then you need to tweak what you
>>> already mentioned. Set DOP to the number of physical cores, there is no
>>> benefit in going higher. If you indeed use an async HTTP client, then the
>>> queue size should be a bit higher than the thread pool size. The thread
>>> pool size will effectively limit the parallelism per subtask and you want
>>> to saturate that from the Flink side. The thread pool size (together with
>>> maxConnections) will put the hard limit together with the request
>>> processing time on your application.
>>>
>>> I'd probably consider using more machines in your stead instead of more
>>> cores per machine (sounded like that is an option). So instead of using
>>> 10x12 cores, use 15x8 cores. You could measure how much max throughput to
>>> expect by using one machine and use a benchmarking tool that increases the
>>> requests per second on that machine until it hits the limit. Then you know
>>> how many machines you need at the very least.
>>>
>>> Finally, it might also be a good time to review your architecture.
>>> Microservices are not the best fit for a streaming application. For
>>> example, if this is a lookup service, it would scale and fit much better if
>>> all data could be ingested by Flink as an additional data source (e.g.
>>> Kafka topic). Existing microservices might be converted into such data
>>> sources with change-data-capture.
>>>
>>> [1]
>>> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
>>> [2]
>>> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>>>
>>> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com>
>>> wrote:
>>>
>>>> As Flink Async IO operator is designed for external API or DB calls,
>>>> are there any specific guidelines / tips for scaling up this operator?
>>>> Particularly for use-cases where incoming events are being ingested at a
>>>> very high-speed and the Async IO operator with orderedWait mode can
>>>> not keep up with that speed (although the target API endpoint it is calling
>>>> is load tested to provide much higher throughput with very minimal
>>>> latency). In our case adding Async IO operator to the pipeline *reduced
>>>> the throughput by 88% to 90%*. This is huge performance hit!
>>>>
>>>> We tried a couple of things:
>>>>
>>>>    1. Increasing the async buffer capacity parameter, there by
>>>>    increasing the number of concurrent requests at any given point in time
>>>>    that are waiting for response. This proved counter-productive beyond a very
>>>>    small number like 50 or 100.
>>>>    2. Increasing the operator parallelism. This does not help much as
>>>>    the number of cores on our machines are limited (8 or 12)
>>>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>>>    maxConnections, maxConnectionsPerHost) and the size of
>>>>    FixedThreadPool used by the Listener of its Future. Again without much
>>>>    improvement.
>>>>
>>>> Our observation is that although Async IO operator works for one
>>>> stream element at a time, the operator and its underlying HTTP client are
>>>> multithreaded and need higher core machines for high-speed stream
>>>> processing. If the only machines available for this kind of applications
>>>> are 8 to 16 cores, we face challenges in meeting the required throughput
>>>> and latency SLAs.
>>>>
>>>> Are there any micro-benchmarks or tuning guidelines for using Async IO for
>>>> high-speed stream processing, so we know how much throughput to expect from
>>>> it?
>>>> Thanks & regards,
>>>> Arti
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arvid Heise <ar...@ververica.com>.
Hi Arti,

you are now comparing a microbenchmark to a full benchmark. The job
contains multiple transformations (each dividing the power of the core) and
most importantly, it contains network traffic (shuffle) to perform a join.
Unless you do heavy transformation, your throughput will always be
bottlenecked with network traffic. At the very least, that contains
additional serialization costs. And it contains heavy states through the
window aggregation, which also results in serialization and I/O.

I attached the screenshot from the microbenchmark. In general,
microbenchmarks are not really useful for end-users, but you explicitly
asked for them. The bottleneck usually arises from the IO part, which is
NOT benchmarked. I'm assuming you are actually more interested in whether
and how your use case can be solved in Flink rather than technical details.
First of all, it always helps to have more information about the intended
query than going directly into technical details. I gathered that you have
a third-part microservice that you need to query and you want to do
additional transformations. It would also be interesting how you performed
your benchmarks and measured the performance drop.

First of all, even though I discouraged the use of microservices in a
stream processor, it doesn't mean that it's not possible. You just lose
some of the nice properties that are possible. 1) Your latency will
typically be much higher as if it was a data source. 2) You lose
exactly-once behavior in regard to the HTTP endpoint. On recovery, requests
since the last checkpoint will be repeated. You need to check if that makes
sense if the microservices has side-effects that don't allow that. But the
same is true for most other stream processors and can only be avoided by
using per-record commits (and then even then, this in-progress record may
result in duplicate HTTP queries on recovery). 3) If that external endpoint
is down, there is no way to do meaningful processing. So you add the
downtime of your Flink cluster and your external microservice. That becomes
especially important if you have a wide-range of microservices.

So as I wrote in last mail, I'd first establish a boundary independent of
Flink, by running some Java program with your used async library and tune
the settings to reach saturation on one machine. That boundary becomes your
gold standard - there is no way Flink or any other stream processor can do
it better. If you publish this number, we can jointly find good async IO
settings. You can of course also see if other libraries are more suitable
for your needs, ideally implementations that use fewer threads and more
light-weight constructs to achieve the same construct. Then I'd also
measure a histogram of response times. Do you have stragglers or are
response times rather uniform? For uniformity, configuration of async I/O
is usually quite straight-forward, just use a queue length that fully
satures your async library (so maxConnections+X, where X is a small
constant safety buffer). It also helps to determine if you need to scale
out or not.


On Thu, Jun 11, 2020 at 9:03 AM Arti Pande <pa...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for a quick reply.
>
> The second reference link (
> http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2)
> from your answer is not accessible though. Could you share some more
> numbers from it? Are these benchmarks published somewhere?
>
> Without actual IO call, Async IO operator benchmark of 1.6 K records/ms
> per core translates to *1.6 million records/sec per core*. So an 8 core
> machine should give roughly *12.8 million records/sec* ? Is this the
> correct number? How do we compare it with this benchmark
> <https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
> article that talks about total throughput of 4 million records/sec (without
> Async IO operator) in a cluster of about 10 machines with 16-core each?
>
> Ordered wait is indispensable for our use-case because we need to call the
> external (partner organisation) system's API endpoint for each incoming
> record. Depending on the response from that API we need to decide how to
> process this record and order needs to be preserved. This may not have been
> a problem if data ingestion rates were low. Real challenge is because of
> the high-speed stream (millions of events per second) of input.
>
> Is higher core machines an answer or is Flink not suitable for use-cases
> like this?
>
>
>
> On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Arti,
>>
>> microbenchmarks for AsyncIO are available [1] and the results shown in
>> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
>> limit without any actual I/O. That range should hold for Flink 1.10 and
>> coming Flink 1.11. I cannot say much about older versions and you didn't
>> specify which you use. But it shouldn't be an order of magnitude different.
>>
>> The biggest performance improvement will probably be to switch to
>> unordered - results are emitted as soon as they arrive. On ordered, the
>> first element that came in, needs to be finished before any emission. If
>> some elements take longer than others, these slow elements quickly become a
>> bottleneck.
>>
>> If async I/O needs to be ordered, then you need to tweak what you already
>> mentioned. Set DOP to the number of physical cores, there is no benefit in
>> going higher. If you indeed use an async HTTP client, then the queue size
>> should be a bit higher than the thread pool size. The thread pool size will
>> effectively limit the parallelism per subtask and you want to saturate that
>> from the Flink side. The thread pool size (together with maxConnections)
>> will put the hard limit together with the request processing time on your
>> application.
>>
>> I'd probably consider using more machines in your stead instead of more
>> cores per machine (sounded like that is an option). So instead of using
>> 10x12 cores, use 15x8 cores. You could measure how much max throughput to
>> expect by using one machine and use a benchmarking tool that increases the
>> requests per second on that machine until it hits the limit. Then you know
>> how many machines you need at the very least.
>>
>> Finally, it might also be a good time to review your architecture.
>> Microservices are not the best fit for a streaming application. For
>> example, if this is a lookup service, it would scale and fit much better if
>> all data could be ingested by Flink as an additional data source (e.g.
>> Kafka topic). Existing microservices might be converted into such data
>> sources with change-data-capture.
>>
>> [1]
>> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
>> [2] http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>>
>> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com> wrote:
>>
>>> As Flink Async IO operator is designed for external API or DB calls,
>>> are there any specific guidelines / tips for scaling up this operator?
>>> Particularly for use-cases where incoming events are being ingested at a
>>> very high-speed and the Async IO operator with orderedWait mode can not
>>> keep up with that speed (although the target API endpoint it is calling is
>>> load tested to provide much higher throughput with very minimal latency).
>>> In our case adding Async IO operator to the pipeline *reduced the
>>> throughput by 88% to 90%*. This is huge performance hit!
>>>
>>> We tried a couple of things:
>>>
>>>    1. Increasing the async buffer capacity parameter, there by
>>>    increasing the number of concurrent requests at any given point in time
>>>    that are waiting for response. This proved counter-productive beyond a very
>>>    small number like 50 or 100.
>>>    2. Increasing the operator parallelism. This does not help much as
>>>    the number of cores on our machines are limited (8 or 12)
>>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>>    maxConnections, maxConnectionsPerHost) and the size of
>>>    FixedThreadPool used by the Listener of its Future. Again without much
>>>    improvement.
>>>
>>> Our observation is that although Async IO operator works for one stream
>>> element at a time, the operator and its underlying HTTP client are
>>> multithreaded and need higher core machines for high-speed stream
>>> processing. If the only machines available for this kind of applications
>>> are 8 to 16 cores, we face challenges in meeting the required throughput
>>> and latency SLAs.
>>>
>>> Are there any micro-benchmarks or tuning guidelines for using Async IO for
>>> high-speed stream processing, so we know how much throughput to expect from
>>> it?
>>> Thanks & regards,
>>> Arti
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arti Pande <pa...@gmail.com>.
Hi Arvid,

Thanks for a quick reply.

The second reference link (
http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2) from
your answer is not accessible though. Could you share some more numbers
from it? Are these benchmarks published somewhere?

Without actual IO call, Async IO operator benchmark of 1.6 K records/ms per
core translates to *1.6 million records/sec per core*. So an 8 core machine
should give roughly *12.8 million records/sec* ? Is this the correct
number? How do we compare it with this benchmark
<https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
article that talks about total throughput of 4 million records/sec (without
Async IO operator) in a cluster of about 10 machines with 16-core each?

Ordered wait is indispensable for our use-case because we need to call the
external (partner organisation) system's API endpoint for each incoming
record. Depending on the response from that API we need to decide how to
process this record and order needs to be preserved. This may not have been
a problem if data ingestion rates were low. Real challenge is because of
the high-speed stream (millions of events per second) of input.

Is higher core machines an answer or is Flink not suitable for use-cases
like this?



On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Arti,
>
> microbenchmarks for AsyncIO are available [1] and the results shown in
> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
> limit without any actual I/O. That range should hold for Flink 1.10 and
> coming Flink 1.11. I cannot say much about older versions and you didn't
> specify which you use. But it shouldn't be an order of magnitude different.
>
> The biggest performance improvement will probably be to switch to
> unordered - results are emitted as soon as they arrive. On ordered, the
> first element that came in, needs to be finished before any emission. If
> some elements take longer than others, these slow elements quickly become a
> bottleneck.
>
> If async I/O needs to be ordered, then you need to tweak what you already
> mentioned. Set DOP to the number of physical cores, there is no benefit in
> going higher. If you indeed use an async HTTP client, then the queue size
> should be a bit higher than the thread pool size. The thread pool size will
> effectively limit the parallelism per subtask and you want to saturate that
> from the Flink side. The thread pool size (together with maxConnections)
> will put the hard limit together with the request processing time on your
> application.
>
> I'd probably consider using more machines in your stead instead of more
> cores per machine (sounded like that is an option). So instead of using
> 10x12 cores, use 15x8 cores. You could measure how much max throughput to
> expect by using one machine and use a benchmarking tool that increases the
> requests per second on that machine until it hits the limit. Then you know
> how many machines you need at the very least.
>
> Finally, it might also be a good time to review your architecture.
> Microservices are not the best fit for a streaming application. For
> example, if this is a lookup service, it would scale and fit much better if
> all data could be ingested by Flink as an additional data source (e.g.
> Kafka topic). Existing microservices might be converted into such data
> sources with change-data-capture.
>
> [1]
> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
> [2] http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>
> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com> wrote:
>
>> As Flink Async IO operator is designed for external API or DB calls, are
>> there any specific guidelines / tips for scaling up this operator?
>> Particularly for use-cases where incoming events are being ingested at a
>> very high-speed and the Async IO operator with orderedWait mode can not
>> keep up with that speed (although the target API endpoint it is calling is
>> load tested to provide much higher throughput with very minimal latency).
>> In our case adding Async IO operator to the pipeline *reduced the
>> throughput by 88% to 90%*. This is huge performance hit!
>>
>> We tried a couple of things:
>>
>>    1. Increasing the async buffer capacity parameter, there by
>>    increasing the number of concurrent requests at any given point in time
>>    that are waiting for response. This proved counter-productive beyond a very
>>    small number like 50 or 100.
>>    2. Increasing the operator parallelism. This does not help much as
>>    the number of cores on our machines are limited (8 or 12)
>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>    maxConnections, maxConnectionsPerHost) and the size of
>>    FixedThreadPool used by the Listener of its Future. Again without much
>>    improvement.
>>
>> Our observation is that although Async IO operator works for one stream
>> element at a time, the operator and its underlying HTTP client are
>> multithreaded and need higher core machines for high-speed stream
>> processing. If the only machines available for this kind of applications
>> are 8 to 16 cores, we face challenges in meeting the required throughput
>> and latency SLAs.
>>
>> Are there any micro-benchmarks or tuning guidelines for using Async IO for
>> high-speed stream processing, so we know how much throughput to expect from
>> it?
>> Thanks & regards,
>> Arti
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Flink Async IO operator tuning / micro-benchmarks

Posted by Arvid Heise <ar...@ververica.com>.
Hi Arti,

microbenchmarks for AsyncIO are available [1] and the results shown in [2].
So you can roughly expect 1.6k records/ms per core to be the upper limit
without any actual I/O. That range should hold for Flink 1.10 and coming
Flink 1.11. I cannot say much about older versions and you didn't specify
which you use. But it shouldn't be an order of magnitude different.

The biggest performance improvement will probably be to switch to unordered
- results are emitted as soon as they arrive. On ordered, the first element
that came in, needs to be finished before any emission. If some elements
take longer than others, these slow elements quickly become a bottleneck.

If async I/O needs to be ordered, then you need to tweak what you already
mentioned. Set DOP to the number of physical cores, there is no benefit in
going higher. If you indeed use an async HTTP client, then the queue size
should be a bit higher than the thread pool size. The thread pool size will
effectively limit the parallelism per subtask and you want to saturate that
from the Flink side. The thread pool size (together with maxConnections)
will put the hard limit together with the request processing time on your
application.

I'd probably consider using more machines in your stead instead of more
cores per machine (sounded like that is an option). So instead of using
10x12 cores, use 15x8 cores. You could measure how much max throughput to
expect by using one machine and use a benchmarking tool that increases the
requests per second on that machine until it hits the limit. Then you know
how many machines you need at the very least.

Finally, it might also be a good time to review your architecture.
Microservices are not the best fit for a streaming application. For
example, if this is a lookup service, it would scale and fit much better if
all data could be ingested by Flink as an additional data source (e.g.
Kafka topic). Existing microservices might be converted into such data
sources with change-data-capture.

[1]
https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
[2] http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2

On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pa...@gmail.com> wrote:

> As Flink Async IO operator is designed for external API or DB calls, are
> there any specific guidelines / tips for scaling up this operator?
> Particularly for use-cases where incoming events are being ingested at a
> very high-speed and the Async IO operator with orderedWait mode can not
> keep up with that speed (although the target API endpoint it is calling is
> load tested to provide much higher throughput with very minimal latency).
> In our case adding Async IO operator to the pipeline *reduced the
> throughput by 88% to 90%*. This is huge performance hit!
>
> We tried a couple of things:
>
>    1. Increasing the async buffer capacity parameter, there by increasing
>    the number of concurrent requests at any given point in time that are
>    waiting for response. This proved counter-productive beyond a very small
>    number like 50 or 100.
>    2. Increasing the operator parallelism. This does not help much as the
>    number of cores on our machines are limited (8 or 12)
>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>    maxConnections, maxConnectionsPerHost) and the size of FixedThreadPool
>    used by the Listener of its Future. Again without much improvement.
>
> Our observation is that although Async IO operator works for one stream
> element at a time, the operator and its underlying HTTP client are
> multithreaded and need higher core machines for high-speed stream
> processing. If the only machines available for this kind of applications
> are 8 to 16 cores, we face challenges in meeting the required throughput
> and latency SLAs.
>
> Are there any micro-benchmarks or tuning guidelines for using Async IO for
> high-speed stream processing, so we know how much throughput to expect from
> it?
> Thanks & regards,
> Arti
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng