You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jing Lu <aj...@gmail.com> on 2021/12/07 21:51:30 UTC

How to write from Flink to a write throttled database?

Hi, community

I have a Kafka stream and want to use Flink for 10 minutes aggregation.
However, the number of events is large, and the writes are throttled for
the output database for a few seconds during an hour. I was thinking to
write from Flink to another Kafka stream and using another Flink app to
write to a database. Will this smooth the writing? What should I do for the
second Flink app?


Thanks,
Jing

Re: How to write from Flink to a write throttled database?

Posted by Jing Lu <aj...@gmail.com>.
Write throttled but I may not have data loss, right? I saw this line in
producer:

https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202




On Tue, Dec 7, 2021 at 7:48 PM Jing Lu <aj...@gmail.com> wrote:

> Hi Caizhi,
>
> Here is my current configuration:
>
> val dynamoDBSinkConfig: DynamoDBSinkConfig =
>   (new DynamoDBSinkConfig.Builder).batchSize(50).queueLimit(20).build()
>
> new FlinkDynamoDBSink[Row](
>   dynamoDBBuilder,
>   "tablename",
>   dynamoDBSinkConfig,
>   mapper
> )
>
>
> I think this is batch write.
>
>
>
>
> On Tue, Dec 7, 2021 at 6:34 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> Thanks for the clarification.
>>
>> I'm not familiar with DynamoDB and you might want to modify this
>> connector a bit. Will a WriteRequest immediately send write requests to the
>> database? If yes you may want to instead cache the requests in memory and
>> send them only at snapshots. See [1] for the code to deal with incoming
>> records and [2] for snapshots.
>>
>> [1]
>> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
>> [2]
>> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202
>>
>> Jing Lu <aj...@gmail.com> 于2021年12月8日周三 10:22写道:
>>
>>> Hi Cazhi,
>>>
>>> Thanks for your reply! The database is DynamoDB. The connector I use is
>>> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
>>> is a continuous event stream. My Flink version is 1.12.
>>>
>>> Best,
>>> Jing
>>>
>>> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng <ts...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> Which database are you referring to? If there is no officially
>>>> supported connector of this database you can create your own sink operator
>>>> with GenericWriteAheadSink.
>>>>
>>>> Note that if you're using Flink < 1.14 and if your source is bounded
>>>> (that is to say, your source will eventually come to an end and finishes
>>>> the job) you might lose the last bit of result. See [1] for detail.
>>>>
>>>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>>>>
>>>> Jing Lu <aj...@gmail.com> 于2021年12月8日周三 05:51写道:
>>>>
>>>>> Hi, community
>>>>>
>>>>> I have a Kafka stream and want to use Flink for 10 minutes
>>>>> aggregation. However, the number of events is large, and the writes are
>>>>> throttled for the output database for a few seconds during an hour. I was
>>>>> thinking to write from Flink to another Kafka stream and using another
>>>>> Flink app to write to a database. Will this smooth the writing? What should
>>>>> I do for the second Flink app?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Jing
>>>>>
>>>>

Re: How to write from Flink to a write throttled database?

Posted by Jing Lu <aj...@gmail.com>.
Hi Caizhi,

Here is my current configuration:

val dynamoDBSinkConfig: DynamoDBSinkConfig =
  (new DynamoDBSinkConfig.Builder).batchSize(50).queueLimit(20).build()

new FlinkDynamoDBSink[Row](
  dynamoDBBuilder,
  "tablename",
  dynamoDBSinkConfig,
  mapper
)


I think this is batch write.




On Tue, Dec 7, 2021 at 6:34 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> Thanks for the clarification.
>
> I'm not familiar with DynamoDB and you might want to modify this connector
> a bit. Will a WriteRequest immediately send write requests to the database?
> If yes you may want to instead cache the requests in memory and send them
> only at snapshots. See [1] for the code to deal with incoming records and
> [2] for snapshots.
>
> [1]
> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
> [2]
> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202
>
> Jing Lu <aj...@gmail.com> 于2021年12月8日周三 10:22写道:
>
>> Hi Cazhi,
>>
>> Thanks for your reply! The database is DynamoDB. The connector I use is
>> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
>> is a continuous event stream. My Flink version is 1.12.
>>
>> Best,
>> Jing
>>
>> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng <ts...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> Which database are you referring to? If there is no officially supported
>>> connector of this database you can create your own sink operator
>>> with GenericWriteAheadSink.
>>>
>>> Note that if you're using Flink < 1.14 and if your source is bounded
>>> (that is to say, your source will eventually come to an end and finishes
>>> the job) you might lose the last bit of result. See [1] for detail.
>>>
>>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>>>
>>> Jing Lu <aj...@gmail.com> 于2021年12月8日周三 05:51写道:
>>>
>>>> Hi, community
>>>>
>>>> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
>>>> However, the number of events is large, and the writes are throttled for
>>>> the output database for a few seconds during an hour. I was thinking to
>>>> write from Flink to another Kafka stream and using another Flink app to
>>>> write to a database. Will this smooth the writing? What should I do for the
>>>> second Flink app?
>>>>
>>>>
>>>> Thanks,
>>>> Jing
>>>>
>>>

Re: How to write from Flink to a write throttled database?

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Thanks for the clarification.

I'm not familiar with DynamoDB and you might want to modify this connector
a bit. Will a WriteRequest immediately send write requests to the database?
If yes you may want to instead cache the requests in memory and send them
only at snapshots. See [1] for the code to deal with incoming records and
[2] for snapshots.

[1]
https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
[2]
https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202

Jing Lu <aj...@gmail.com> 于2021年12月8日周三 10:22写道:

> Hi Cazhi,
>
> Thanks for your reply! The database is DynamoDB. The connector I use is
> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
> is a continuous event stream. My Flink version is 1.12.
>
> Best,
> Jing
>
> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> Which database are you referring to? If there is no officially supported
>> connector of this database you can create your own sink operator
>> with GenericWriteAheadSink.
>>
>> Note that if you're using Flink < 1.14 and if your source is bounded
>> (that is to say, your source will eventually come to an end and finishes
>> the job) you might lose the last bit of result. See [1] for detail.
>>
>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>>
>> Jing Lu <aj...@gmail.com> 于2021年12月8日周三 05:51写道:
>>
>>> Hi, community
>>>
>>> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
>>> However, the number of events is large, and the writes are throttled for
>>> the output database for a few seconds during an hour. I was thinking to
>>> write from Flink to another Kafka stream and using another Flink app to
>>> write to a database. Will this smooth the writing? What should I do for the
>>> second Flink app?
>>>
>>>
>>> Thanks,
>>> Jing
>>>
>>

Re: How to write from Flink to a write throttled database?

Posted by Jing Lu <aj...@gmail.com>.
Hi Cazhi,

Thanks for your reply! The database is DynamoDB. The connector I use is
https://github.com/klarna-incubator/flink-connector-dynamodb. My source is
a continuous event stream. My Flink version is 1.12.

Best,
Jing

On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> Which database are you referring to? If there is no officially supported
> connector of this database you can create your own sink operator
> with GenericWriteAheadSink.
>
> Note that if you're using Flink < 1.14 and if your source is bounded (that
> is to say, your source will eventually come to an end and finishes the job)
> you might lose the last bit of result. See [1] for detail.
>
> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>
> Jing Lu <aj...@gmail.com> 于2021年12月8日周三 05:51写道:
>
>> Hi, community
>>
>> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
>> However, the number of events is large, and the writes are throttled for
>> the output database for a few seconds during an hour. I was thinking to
>> write from Flink to another Kafka stream and using another Flink app to
>> write to a database. Will this smooth the writing? What should I do for the
>> second Flink app?
>>
>>
>> Thanks,
>> Jing
>>
>

Re: How to write from Flink to a write throttled database?

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Which database are you referring to? If there is no officially supported
connector of this database you can create your own sink operator
with GenericWriteAheadSink.

Note that if you're using Flink < 1.14 and if your source is bounded (that
is to say, your source will eventually come to an end and finishes the job)
you might lose the last bit of result. See [1] for detail.

[1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526

Jing Lu <aj...@gmail.com> 于2021年12月8日周三 05:51写道:

> Hi, community
>
> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
> However, the number of events is large, and the writes are throttled for
> the output database for a few seconds during an hour. I was thinking to
> write from Flink to another Kafka stream and using another Flink app to
> write to a database. Will this smooth the writing? What should I do for the
> second Flink app?
>
>
> Thanks,
> Jing
>