You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefan Richter (JIRA)" <ji...@apache.org> on 2017/10/19 05:24:00 UTC

[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

    [ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210596#comment-16210596 ] 

Stefan Richter edited comment on FLINK-5372 at 10/19/17 5:23 AM:
-----------------------------------------------------------------

Those are two separate problems and I don't think the problem from the comment is related to FLINK-7757.

First problem is, that the test uses a checkpoint stream factory that produces streams that block on latches, then wants to call {{close()}} while the stream is supposed to be blocked in an async thread executing and writing the rocksdb snapshot. While the test fails is, that the first stream is actually used to take s snapshot of the timer service into raw keyed state. The timer service is not async and therefore the test blocks and cannot reach {{close()}}. I will fix the test through a different factory, that gives a blocking stream on the second call, i.e. for the request that is actually from the keyed backend.

For the second problem, this was a bit nasty to reproduce but simple in cause and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} method to release native resources. Since the latest RocksDB update, their code has assertions in place that are executed in {{finalize()}} and will detect leaking native resources. This happens only when the resource is GC'ed, and this explains why you could only observe the test crash when it was executed with more tests, eventually reaching a GC.


was (Author: srichter):
Those are two separate problems and I don't the problem from the comment is related to FLINK-7757.

First problem is, that the test uses a checkpoint stream factory that produces streams that block on latches, then wants to call {{close()}} while the stream is supposed to be blocked in an async thread executing and writing the rocksdb snapshot. While the test fails is, that the first stream is actually used to take s snapshot of the timer service into raw keyed state. The timer service is not async and therefore the test blocks and cannot reach {{close()}}. I will fix the test through a different factory, that gives a blocking stream on the second call, i.e. for the request that is actually from the keyed backend.

For the second problem, this was a bit nasty to reproduce but simple in cause and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} method to release native resources. Since the latest RocksDB update, their code has assertions in place that are executed in {{finalize()}} and will detect leaking native resources. This happens only when the resource is GC'ed, and this explains why you could only observe the test crash when it was executed with more tests, eventually reaching a GC.

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --------------------------------------------------------------
>
>                 Key: FLINK-5372
>                 URL: https://issues.apache.org/jira/browse/FLINK-5372
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>            Assignee: Stefan Richter
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change {{AsyncCheckpointOperator}} to make sure that we can run fully asynchronously. Then, the test will still fail because the canceling behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
>     extends AbstractStreamOperator<String>
>     implements OneInputStreamOperator<String, String> {
>     @Override
>     public void open() throws Exception {
>         super.open();
>         // also get the state in open, this way we are sure that it was created before
>         // we trigger the test checkpoint
>         ValueState<String> state = getPartitionedState(
>                 VoidNamespace.INSTANCE,
>                 VoidNamespaceSerializer.INSTANCE,
>                 new ValueStateDescriptor<>("count",
>                         StringSerializer.INSTANCE, "hello"));
>     }
>     @Override
>     public void processElement(StreamRecord<String> element) throws Exception {
>         // we also don't care
>         ValueState<String> state = getPartitionedState(
>                 VoidNamespace.INSTANCE,
>                 VoidNamespaceSerializer.INSTANCE,
>                 new ValueStateDescriptor<>("count",
>                         StringSerializer.INSTANCE, "hello"));
>         state.update(element.getValue());
>     }
>     @Override
>     public void snapshotState(StateSnapshotContext context) throws Exception {
>         // do nothing so that we don't block
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)