You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kurtis Walker <ku...@sugarcrm.com> on 2021/04/29 16:11:11 UTC

Backpressure configuration

Hello,
  I’m building a POC for a Flink app that loads data from Kafka in to a Greenplum data warehouse.  I’ve built a custom sink operation that will bulk load a number of rows.  I’m using a global window, triggering on number of records, to collect the rows for each sink.  I’m finding that while the sink is running, the previous operations of my app stop processing messages until the sink operation completes.  I guess this is the backpressure logic kicking in.  The cost being I get about 60% of the throughput that is theoretically possible.  Is there any configuration that would let me control that backpressure so that Flink will buffer rows when it encounters backpressure?  In the ideal world when a sink operation completes, the next batch of rows is ready for the sink to pick up immediately.  Here’s my code:

        env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new SugarDeserializer(), localKafkaProperties))
                .keyBy((KeySelector<Envelope, String>) value -> value.getPayload().getSource().getTable())
                .window(GlobalWindows.create())
                .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), Duration.of(1, ChronoUnit.MINUTES))))
                .aggregate(new ListAggregator())
                .addSink(new CopySink(new SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");


Thanks!

Kurt


Re: Backpressure configuration

Posted by Kurtis Walker <ku...@sugarcrm.com>.
Thanks Roman.  I had already tried disableChaining, it didn’t have any effect.  The built in JDBC sink is really slow compared to a bulk load(close to 100x), but I had tested that and saw the same issue.  When a given message triggers the JDBC sink to write a batch, everything else waits for it.

From: Roman Khachatryan <ro...@apache.org>
Date: Thursday, April 29, 2021 at 2:49 PM
To: Kurtis Walker <ku...@sugarcrm.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Backpressure configuration
EXTERNAL EMAIL

Hello Kurt,

Assuming that your sink is blocking, I would first make sure that it
is not chained with the preceding operators. Otherwise, the same
thread will output data and perform windowing/triggering.
You can add disableChaining after addSink to prevent this [1].

Besides that, you probably could use existing JDBC batching
functionality by configuring JdbcExecutionOptions [2] and providing it
to JdbcSink.sink() [3].

[1] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$>
[2] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$>
[3] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$>

Regards,
Roman

On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker
<ku...@sugarcrm.com> wrote:
>
> Hello,
>
>   I’m building a POC for a Flink app that loads data from Kafka in to a Greenplum data warehouse.  I’ve built a custom sink operation that will bulk load a number of rows.  I’m using a global window, triggering on number of records, to collect the rows for each sink.  I’m finding that while the sink is running, the previous operations of my app stop processing messages until the sink operation completes.  I guess this is the backpressure logic kicking in.  The cost being I get about 60% of the throughput that is theoretically possible.  Is there any configuration that would let me control that backpressure so that Flink will buffer rows when it encounters backpressure?  In the ideal world when a sink operation completes, the next batch of rows is ready for the sink to pick up immediately.  Here’s my code:
>
>
>
>         env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new SugarDeserializer(), localKafkaProperties))
>
>                 .keyBy((KeySelector<Envelope, String>) value -> value.getPayload().getSource().getTable())
>
>                 .window(GlobalWindows.create())
>
>                 .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), Duration.of(1, ChronoUnit.MINUTES))))
>
>                 .aggregate(new ListAggregator())
>
>                 .addSink(new CopySink(new SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");
>
>
>
>
>
> Thanks!
>
>
>
> Kurt
>
>

Re: Backpressure configuration

Posted by Roman Khachatryan <ro...@apache.org>.
Hello Kurt,

Assuming that your sink is blocking, I would first make sure that it
is not chained with the preceding operators. Otherwise, the same
thread will output data and perform windowing/triggering.
You can add disableChaining after addSink to prevent this [1].

Besides that, you probably could use existing JDBC batching
functionality by configuring JdbcExecutionOptions [2] and providing it
to JdbcSink.sink() [3].

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
[2] https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html#withBatchSize-int-
[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html

Regards,
Roman

On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker
<ku...@sugarcrm.com> wrote:
>
> Hello,
>
>   I’m building a POC for a Flink app that loads data from Kafka in to a Greenplum data warehouse.  I’ve built a custom sink operation that will bulk load a number of rows.  I’m using a global window, triggering on number of records, to collect the rows for each sink.  I’m finding that while the sink is running, the previous operations of my app stop processing messages until the sink operation completes.  I guess this is the backpressure logic kicking in.  The cost being I get about 60% of the throughput that is theoretically possible.  Is there any configuration that would let me control that backpressure so that Flink will buffer rows when it encounters backpressure?  In the ideal world when a sink operation completes, the next batch of rows is ready for the sink to pick up immediately.  Here’s my code:
>
>
>
>         env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new SugarDeserializer(), localKafkaProperties))
>
>                 .keyBy((KeySelector<Envelope, String>) value -> value.getPayload().getSource().getTable())
>
>                 .window(GlobalWindows.create())
>
>                 .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), Duration.of(1, ChronoUnit.MINUTES))))
>
>                 .aggregate(new ListAggregator())
>
>                 .addSink(new CopySink(new SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");
>
>
>
>
>
> Thanks!
>
>
>
> Kurt
>
>