You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2018/03/27 08:39:00 UTC
[jira] [Comment Edited] (FLINK-5372) Fix
RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415234#comment-16415234 ]
Chesnay Schepler edited comment on FLINK-5372 at 3/27/18 8:38 AM:
------------------------------------------------------------------
The instability seems to be caused by a plain race condition in the test:
{code}
// L 327
// at this point the task is stuck waiting the the blocker latch
task.cancel();
// let the task continue
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
// we expect the task to be shutdown, but there's no guarantee it managed to do so in time
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
{code}
was (Author: zentol):
The instability seems to be caused by a plain race condition in the test:
{code}
// L 327
// at this point the task is stuck waiting the the blocker latch
task.cancel();
// let the task continue
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
// we expect the task to be shutdown, but there's no guarantee it managed to do so in time
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
{code}
> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --------------------------------------------------------------
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.5.0
> Reporter: Aljoscha Krettek
> Assignee: Stefan Richter
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The test is currently {{@Ignored}}. We have to change {{AsyncCheckpointOperator}} to make sure that we can run fully asynchronously. Then, the test will still fail because the canceling behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator<String>
> implements OneInputStreamOperator<String, String> {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was created before
> // we trigger the test checkpoint
> ValueState<String> state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord<String> element) throws Exception {
> // we also don't care
> ValueState<String> state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)