You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2021/01/20 03:20:00 UTC

[jira] [Comment Edited] (FLINK-21032) JsonFileCompactionITCase fails on azure

    [ https://issues.apache.org/jira/browse/FLINK-21032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268345#comment-17268345 ] 

Leonard Xu edited comment on FLINK-21032 at 1/20/21, 3:19 AM:
--------------------------------------------------------------

 

From the detail log, the job has 3 compact-operator(streaming-writer) to do the compaction work, but one of them get a checkpoint Exception and did not finish the checkpoint and did not produce the compaction files as expected.
{code:java}
11:36:47,761 [Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (2/3)#0] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,761 [Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (3/3)#0] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,762 [Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (1/3)#0] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=4 (max part counter=68).11:36:47,970 [compact-operator (1/3)#0] INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Compaction time cost is '0.202S', target file is 'file:/tmp/junit8284797133733926241/junit3368655018742742833/compacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34', input files are '[file:/tmp/junit8284797133733926241/junit3368655018742742833/.uncompacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34...]
'11:36:47,977 [jobmanager-future-thread-26] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 4 for job 7c53c3e6a6dd259c6cfa396242680c31 (17514 bytes in 218 ms).
11:36:47,982 [AsyncOperations-thread-1] INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (3/3)#0 - asynchronous part of checkpoint 5 could not be completed.java.util.concurrent.CancellationException: null    at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_275]    at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_275]    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621) ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60) ~[flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122) [flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]{code}
 

And thus when read out from the compaction files, we found the data lost.

 I think this test failed because the source operator(ParallelFiniteTestSource) is still 3 parallelism rather than single parallelism. If the source is multiple parallelism and the source will end in test(in real case, the source won't stop),  after one source subtask received endInput,the other source subtask is still waiting checkpoint or running, this situation may lead this problem.

 

How do you think? [~dwysakowicz] [~lzljs3620320]


was (Author: leonard xu):
 

From the detail log, the job has 3 compact-operator(streaming-writer) do the compaction work, but one of them get a checkpoint Exception and did not finish the checkpoint and did not produce the compaction files as expected.
{code:java}
11:36:47,761 [Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (2/3)#0] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,761 [Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (3/3)#0] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,762 [Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (1/3)#0] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=4 (max part counter=68).11:36:47,970 [compact-operator (1/3)#0] INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Compaction time cost is '0.202S', target file is 'file:/tmp/junit8284797133733926241/junit3368655018742742833/compacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34', input files are '[file:/tmp/junit8284797133733926241/junit3368655018742742833/.uncompacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34...]
'11:36:47,977 [jobmanager-future-thread-26] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 4 for job 7c53c3e6a6dd259c6cfa396242680c31 (17514 bytes in 218 ms).
11:36:47,982 [AsyncOperations-thread-1] INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, b, c]) -> Filter -> streaming-writer (3/3)#0 - asynchronous part of checkpoint 5 could not be completed.java.util.concurrent.CancellationException: null    at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_275]    at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_275]    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621) ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60) ~[flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122) [flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]{code}
 

And thus when read out from the compaction files, we found the data lost.

 I think this test failed because the source operator(ParallelFiniteTestSource) is still 3 parallelism rather than single parallelism. If the source is multiple parallelism and the source will end in test(in real case, the source won't stop),  after one source subtask received endInput,the other source subtask is still waiting checkpoint or running, this situation may lead this problem.

> JsonFileCompactionITCase fails on azure
> ---------------------------------------
>
>                 Key: FLINK-21032
>                 URL: https://issues.apache.org/jira/browse/FLINK-21032
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem
>            Reporter: Dawid Wysakowicz
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12230&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=03dca39c-73e8-5aaf-601d-328ae5c35f20
> {code}
> [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 11.39 s <<< FAILURE! - in org.apache.flink.formats.json.JsonFileCompactionITCase
> [ERROR] testSingleParallelism(org.apache.flink.formats.json.JsonFileCompactionITCase)  Time elapsed: 1.21 s  <<< FAILURE!
> java.lang.AssertionError: expected:<[+I[0, 0, 0], +I[0, 0, 0], +I[1, 1, 1], +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], +I[4, 4, 4], +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], +I[7, 7, 7], +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 0], +I[10, 0, 0], +I[11, 1, 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], +I[13, 3, 3], +I[13, 3, 3], +I[14, 4, 4], +I[14, 4, 4], +I[15, 5, 5], +I[15, 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 7, 7], +I[17, 7, 7], +I[18, 8, 8], +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], +I[21, 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3], +I[24, 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[26, 6, 6], +I[27, 7, 7], +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], +I[29, 9, 9], +I[30, 0, 0], +I[30, 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], +I[34, 4, 4], +I[34, 4, 4], +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 7, 7], +I[37, 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], +I[40, 0, 0], +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 3, 3], +I[43, 3, 3], +I[44, 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], +I[46, 6, 6], +I[46, 6, 6], +I[47, 7, 7], +I[47, 7, 7], +I[48, 8, 8], +I[48, 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 0, 0], +I[50, 0, 0], +I[51, 1, 1], +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], +I[54, 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6], +I[57, 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[59, 9, 9], +I[60, 0, 0], +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], +I[62, 2, 2], +I[63, 3, 3], +I[63, 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], +I[67, 7, 7], +I[67, 7, 7], +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 0, 0], +I[70, 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], +I[73, 3, 3], +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 6, 6], +I[76, 6, 6], +I[77, 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], +I[79, 9, 9], +I[79, 9, 9], +I[80, 0, 0], +I[80, 0, 0], +I[81, 1, 1], +I[81, 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 3, 3], +I[83, 3, 3], +I[84, 4, 4], +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], +I[87, 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9], +I[90, 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[92, 2, 2], +I[93, 3, 3], +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], +I[95, 5, 5], +I[96, 6, 6], +I[96, 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]> but was:<[+I[0, 0, 0], +I[0, 0, 0], +I[1, 1, 1], +I[1, 1, 1], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], +I[4, 4, 4], +I[4, 4, 4], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], +I[7, 7, 7], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 0], +I[10, 0, 0], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], +I[13, 3, 3], +I[13, 3, 3], +I[14, 4, 4], +I[15, 5, 5], +I[15, 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 7, 7], +I[18, 8, 8], +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], +I[21, 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[24, 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[27, 7, 7], +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], +I[30, 0, 0], +I[30, 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], +I[34, 4, 4], +I[34, 4, 4], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 7, 7], +I[37, 7, 7], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], +I[40, 0, 0], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 3, 3], +I[43, 3, 3], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], +I[46, 6, 6], +I[46, 6, 6], +I[47, 7, 7], +I[48, 8, 8], +I[48, 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 0, 0], +I[51, 1, 1], +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], +I[54, 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[57, 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[60, 0, 0], +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], +I[63, 3, 3], +I[63, 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], +I[67, 7, 7], +I[67, 7, 7], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 0, 0], +I[70, 0, 0], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], +I[73, 3, 3], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 6, 6], +I[76, 6, 6], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], +I[79, 9, 9], +I[79, 9, 9], +I[80, 0, 0], +I[81, 1, 1], +I[81, 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 3, 3], +I[84, 4, 4], +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], +I[87, 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[90, 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[93, 3, 3], +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], +I[96, 6, 6], +I[96, 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]>
> 	at org.junit.Assert.fail(Assert.java:88)
> 	at org.junit.Assert.failNotEquals(Assert.java:834)
> 	at org.junit.Assert.assertEquals(Assert.java:118)
> 	at org.junit.Assert.assertEquals(Assert.java:144)
> 	at org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.assertIterator(CompactionITCaseBase.java:134)
> 	at org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.innerTestNonPartition(CompactionITCaseBase.java:109)
> 	at org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.testSingleParallelism(CompactionITCaseBase.java:96)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)