You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Magalhães <sp...@gmail.com> on 2020/03/05 18:54:27 UTC

Weird behaviour testing TwoPhaseCommit

I've implemented a CustomSink with TwoPhaseCommit. To test this I've create
a test using the baselines of this [1] one, and it works fine.

To test the integration with S3 (and with an exponential back off), I've
tried to implement a new test, using the following code:

...
val invalidWriter = writer
      .asInstanceOf[WindowParquetGenericRecordListFileSink]
      .copy(filePath = s"s3a://bucket_that_doesnt_exists/")

val records: Iterable[GenericRecord] = Iterable apply {
    new GenericData.Record(GenericRecordSchema.schema) {
    put(KEY.name, "x")
    put(EVENT.name, "record.value()")
    put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
    put(EVENT_TYPE.name, "xpto")
    }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env

    .enableCheckpointing(1000)
    .fromElements(records)
    .addSink(invalidWriter)

val task = executor.submit(() => env.execute("s3exponential"))
...

This will setup a small environment with one record and enable checkpoint
(in order for the TPC works), and then execute in another thread so the
test check check if the error count is increasing.

So, the test have the following behaviour:

If I use enableCheckpointing(10), the test passes 9 of 10 times.
If I use other values, like 1000, the test fails if not all the times, most
of the times.

Here is a small example of the log when the test is successful.

2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
2020-03-05 16:04:40,342 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
2020-03-05 16:04:40,342 DEBUG
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
flushing transaction
'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
transactionStartTime=1583424280304}'
### 2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
openedTransactions [1]
### 2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - operation='preCommit', message='Start writing #1 records'

When the test fails, here is some part of the log:

2020-03-05 16:38:44,386 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
2020-03-05 16:38:44,386 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Aborting
checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
2020-03-05 16:38:44,387 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
feeding buffered data back.
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  -
ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
consumed notification for subpartition 0.
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
Received consume notification from ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].

Not sure why this behaviour changes with the time for each checkpoint, but
until now I didn't find the reason why "pre commit" isn't execute

Does anyone have any thought, something that I'm missing ?

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java

Re: Weird behaviour testing TwoPhaseCommit

Posted by David Magalhães <sp...@gmail.com>.
Awesome Arvid, thanks a lot! :)

And I thought when doing this that I was simplifying the test ...

On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> bounded sources do not work well with checkpointing. As soon as the source
> is drained, no checkpoints are performed anymore. It's an unfortunate
> limitation that we want to get rid of, but haven't found the time (because
> it requires larger changes).
>
> So for your test to work, you need to add a source that is continuously
> open, but does not output more than one element. Fortunately, there is
> already a working implementation in our test bed.
>
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
>
> On Thu, Mar 5, 2020 at 7:54 PM David Magalhães <sp...@gmail.com>
> wrote:
>
>> I've implemented a CustomSink with TwoPhaseCommit. To test this I've
>> create a test using the baselines of this [1] one, and it works fine.
>>
>> To test the integration with S3 (and with an exponential back off), I've
>> tried to implement a new test, using the following code:
>>
>> ...
>> val invalidWriter = writer
>>       .asInstanceOf[WindowParquetGenericRecordListFileSink]
>>       .copy(filePath = s"s3a://bucket_that_doesnt_exists/")
>>
>> val records: Iterable[GenericRecord] = Iterable apply {
>>     new GenericData.Record(GenericRecordSchema.schema) {
>>     put(KEY.name, "x")
>>     put(EVENT.name, "record.value()")
>>     put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
>>     put(EVENT_TYPE.name, "xpto")
>>     }
>> }
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env
>>
>>     .enableCheckpointing(1000)
>>     .fromElements(records)
>>     .addSink(invalidWriter)
>>
>> val task = executor.submit(() => env.execute("s3exponential"))
>> ...
>>
>> This will setup a small environment with one record and enable checkpoint
>> (in order for the TPC works), and then execute in another thread so the
>> test check check if the error count is increasing.
>>
>> So, the test have the following behaviour:
>>
>> If I use enableCheckpointing(10), the test passes 9 of 10 times.
>> If I use other values, like 1000, the test fails if not all the times,
>> most of the times.
>>
>> Here is a small example of the log when the test is successful.
>>
>> 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
>> 2020-03-05 16:04:40,342 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
>> checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
>> 2020-03-05 16:04:40,342 DEBUG
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
>> WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
>> flushing transaction
>> 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
>> transactionStartTime=1583424280304}'
>> ### 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
>> openedTransactions [1]
>> ### 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - operation='preCommit', message='Start writing #1 records'
>>
>> When the test fails, here is some part of the log:
>>
>> 2020-03-05 16:38:44,386 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
>> 2020-03-05 16:38:44,386 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Aborting
>> checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
>> 2020-03-05 16:38:44,387 DEBUG
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
>> Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
>> feeding buffered data back.
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartition  -
>> ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
>> consumed notification for subpartition 0.
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
>> Received consume notification from ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
>> Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
>> ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>>
>> Not sure why this behaviour changes with the time for each checkpoint,
>> but until now I didn't find the reason why "pre commit" isn't execute
>>
>> Does anyone have any thought, something that I'm missing ?
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
>>
>

Re: Weird behaviour testing TwoPhaseCommit

Posted by Arvid Heise <ar...@ververica.com>.
Hi David,

bounded sources do not work well with checkpointing. As soon as the source
is drained, no checkpoints are performed anymore. It's an unfortunate
limitation that we want to get rid of, but haven't found the time (because
it requires larger changes).

So for your test to work, you need to add a source that is continuously
open, but does not output more than one element. Fortunately, there is
already a working implementation in our test bed.
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java

On Thu, Mar 5, 2020 at 7:54 PM David Magalhães <sp...@gmail.com>
wrote:

> I've implemented a CustomSink with TwoPhaseCommit. To test this I've
> create a test using the baselines of this [1] one, and it works fine.
>
> To test the integration with S3 (and with an exponential back off), I've
> tried to implement a new test, using the following code:
>
> ...
> val invalidWriter = writer
>       .asInstanceOf[WindowParquetGenericRecordListFileSink]
>       .copy(filePath = s"s3a://bucket_that_doesnt_exists/")
>
> val records: Iterable[GenericRecord] = Iterable apply {
>     new GenericData.Record(GenericRecordSchema.schema) {
>     put(KEY.name, "x")
>     put(EVENT.name, "record.value()")
>     put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
>     put(EVENT_TYPE.name, "xpto")
>     }
> }
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env
>
>     .enableCheckpointing(1000)
>     .fromElements(records)
>     .addSink(invalidWriter)
>
> val task = executor.submit(() => env.execute("s3exponential"))
> ...
>
> This will setup a small environment with one record and enable checkpoint
> (in order for the TPC works), and then execute in another thread so the
> test check check if the error count is increasing.
>
> So, the test have the following behaviour:
>
> If I use enableCheckpointing(10), the test passes 9 of 10 times.
> If I use other values, like 1000, the test fails if not all the times,
> most of the times.
>
> Here is a small example of the log when the test is successful.
>
> 2020-03-05 16:04:40,342 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
> 2020-03-05 16:04:40,342 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
> checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
> 2020-03-05 16:04:40,342 DEBUG
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
> WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
> flushing transaction
> 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
> transactionStartTime=1583424280304}'
> ### 2020-03-05 16:04:40,342 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
> openedTransactions [1]
> ### 2020-03-05 16:04:40,342 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - operation='preCommit', message='Start writing #1 records'
>
> When the test fails, here is some part of the log:
>
> 2020-03-05 16:38:44,386 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
> 2020-03-05 16:38:44,386 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Aborting
> checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
> 2020-03-05 16:38:44,387 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
> Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
> feeding buffered data back.
> 2020-03-05 16:38:44,390 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartition  -
> ReleaseOnConsumptionResultPartition
> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
> consumed notification for subpartition 0.
> 2020-03-05 16:38:44,390 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
> Received consume notification from ReleaseOnConsumptionResultPartition
> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
> 2020-03-05 16:38:44,390 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
> Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
> ReleaseOnConsumptionResultPartition
> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>
> Not sure why this behaviour changes with the time for each checkpoint, but
> until now I didn't find the reason why "pre commit" isn't execute
>
> Does anyone have any thought, something that I'm missing ?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
>