You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/03/19 20:04:50 UTC
Capturing Statement Execution / Results within JdbcSink
Hey all,
I've been working with JdbcSink and it's really made my life much easier,
but I had two questions about it that folks might be able to answer or
provide some clarity around.
*Accessing Statement Execution / Results*
Is there any mechanism in place (or out of the box) to support reading the
results of statements executed by the JdbcSink or would I need to implement
my own to support it?
The problem that I'm trying to solve relates to observability (i.e.
metrics) and incrementing specific counters based on the response from a
given statement executing. One example might be if I need to upsert 40
widgets that are coming in, although some may be the same widget, I only
want to increment my metric if the widget didn't already exist, which I
could get via the response from the underlying queries.
*Batching Mechanisms (withBatchIntervalMs & withBatchSize)*
This was another great feature that I was happy to see since I didn't want
to handle writing my own windowing logic for something as trivial as this.
I noticed some odd behaviors when I attempted to implement this being
driven by configuration:
private fun getJdbcExecutionOptions(parameters: ParameterTool):
JdbcExecutionOptions {
var executionOptions = JdbcExecutionOptions.builder()
if (parameters.getBoolean("database.batching.enabled", false)){
if (parameters.has("database.batching.ms")){
val batchingIntervalMs = parameters.getLong("database.batching.ms")
executionOptions = executionOptions
.withBatchIntervalMs(batchingIntervalMs)
}
if (parameters.has("database.batching.records")){
val batchingRecords = parameters.getInt("database.batching.records")
executionOptions = executionOptions
.withBatchSize(batchingRecords)
}
}
return executionOptions.build()
}
With the settings of 60000 (batchIntervalMs) and 100 (batchSize), it was
around 7-8 minutes prior to a write to the destination taking place,
however when previously just using the batchIntervalMs configuration, I'd
see it consistently write out one a minute.
I was looking through the source and it seems the time-based emissions are
scheduled asynchronously. I may have missed something, but I didn't
explicitly see something where a records-based emission would affect the
scheduled emission.
I'm just trying to get confirmation if these work together as an OR
operation (i.e. flush the pending records once a given number of records
have been seen or once a time interval has elasped).
Thanks so much, you folks have been an incredible community in my short
time here and I've enjoyed working with Flink, contributing, and I hope to
continue to do much more!
Rion
Re: Capturing Statement Execution / Results within JdbcSink
Posted by Roman Khachatryan <ro...@apache.org>.
Hey Rion,
Regarding
> Accessing Statement Execution / Results,
There are no ways currently to get the update count from the database
unfortunately.
As for the
> Batching Mechanisms (withBatchIntervalMs & withBatchSize),
These parameters should have "OR" semantics: the database should be
updated whichever happens first. The decision to flush based on the
current batch size is made in JdbcBatchingOutputFormat#writeRecord
[1].
[1]
https://github.com/apache/flink/blob/c0cf91ce6fbe55f9f1df1fb1626a50e79cb9fc50/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java#L166
Thanks from the community for using Flink and for your feedback =)
Regards,
Roman
On Fri, Mar 19, 2021 at 9:05 PM Rion Williams <ri...@gmail.com> wrote:
>
> Hey all,
>
> I've been working with JdbcSink and it's really made my life much easier, but I had two questions about it that folks might be able to answer or provide some clarity around.
>
> Accessing Statement Execution / Results
>
> Is there any mechanism in place (or out of the box) to support reading the results of statements executed by the JdbcSink or would I need to implement my own to support it?
>
> The problem that I'm trying to solve relates to observability (i.e. metrics) and incrementing specific counters based on the response from a given statement executing. One example might be if I need to upsert 40 widgets that are coming in, although some may be the same widget, I only want to increment my metric if the widget didn't already exist, which I could get via the response from the underlying queries.
>
> Batching Mechanisms (withBatchIntervalMs & withBatchSize)
>
> This was another great feature that I was happy to see since I didn't want to handle writing my own windowing logic for something as trivial as this. I noticed some odd behaviors when I attempted to implement this being driven by configuration:
>
> private fun getJdbcExecutionOptions(parameters: ParameterTool): JdbcExecutionOptions {
> var executionOptions = JdbcExecutionOptions.builder()
> if (parameters.getBoolean("database.batching.enabled", false)){
> if (parameters.has("database.batching.ms")){
> val batchingIntervalMs = parameters.getLong("database.batching.ms")
> executionOptions = executionOptions
> .withBatchIntervalMs(batchingIntervalMs)
> }
>
> if (parameters.has("database.batching.records")){
> val batchingRecords = parameters.getInt("database.batching.records")
> executionOptions = executionOptions
> .withBatchSize(batchingRecords)
> }
> }
>
> return executionOptions.build()
> }
>
> With the settings of 60000 (batchIntervalMs) and 100 (batchSize), it was around 7-8 minutes prior to a write to the destination taking place, however when previously just using the batchIntervalMs configuration, I'd see it consistently write out one a minute.
>
> I was looking through the source and it seems the time-based emissions are scheduled asynchronously. I may have missed something, but I didn't explicitly see something where a records-based emission would affect the scheduled emission.
>
> I'm just trying to get confirmation if these work together as an OR operation (i.e. flush the pending records once a given number of records have been seen or once a time interval has elasped).
>
> Thanks so much, you folks have been an incredible community in my short time here and I've enjoyed working with Flink, contributing, and I hope to continue to do much more!
>
> Rion