You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Francisco Rosa (JIRA)" <ji...@apache.org> on 2017/06/02 02:35:04 UTC

[jira] [Updated] (FLINK-6808) Stream join fails when checkpointing is enabled

     [ https://issues.apache.org/jira/browse/FLINK-6808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Francisco Rosa updated FLINK-6808:
----------------------------------
    Description: 
The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

{code:title=Example|borderStyle=solid}
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoints
        env.enableCheckpointing(5000);

        // create two streams
        DataStreamSource<Long> one = env.generateSequence(0, 5000);
        DataStreamSource<Long> two = env.generateSequence(2000, 15000);

        // process both, provide a delay to make sure checkpoint will happen
        DataStream<String> oneProcessed = one.
                map(oneValue -> {
                    Thread.sleep(1000);
                    return "val-" + oneValue;
                });
        DataStream<String> twoProcessed = two.
                map(oneValue -> {
                    Thread.sleep(1000);
                    return "val-" + oneValue;
                });

        // join the two streams, join on string match
        DataStream<String> joinedStreams = oneProcessed.
                join(twoProcessed).
                where(String::toString).
                equalTo(String::toString).
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                apply(new JoinFunction<String, String, String>() {
                    @Override
                    public String join(String oneValue, String twoValue) {
                        // nothing really relevant, just concatenate string
                        return oneValue + "+" + twoValue;
                    }
                });

        // output results
        joinedStreams.print();

        env.execute("Issue with stream join and checkpoints");
    }
{code}

Stack trace:

java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
	... 8 more
Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
	... 13 more

  was:
The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoints
        env.enableCheckpointing(5000);

        // create two streams
        DataStreamSource<Long> one = env.generateSequence(0, 5000);
        DataStreamSource<Long> two = env.generateSequence(2000, 15000);

        // process both, provide a delay to make sure checkpoint will happen
        DataStream<String> oneProcessed = one.
                map(oneValue -> {
                    Thread.sleep(1000);
                    return "val-" + oneValue;
                });
        DataStream<String> twoProcessed = two.
                map(oneValue -> {
                    Thread.sleep(1000);
                    return "val-" + oneValue;
                });

        // join the two streams, join on string match
        DataStream<String> joinedStreams = oneProcessed.
                join(twoProcessed).
                where(String::toString).
                equalTo(String::toString).
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                apply(new JoinFunction<String, String, String>() {
                    @Override
                    public String join(String oneValue, String twoValue) {
                        // nothing really relevant, just concatenate string
                        return oneValue + "+" + twoValue;
                    }
                });

        // output results
        joinedStreams.print();

        env.execute("Issue with stream join and checkpoints");
    }

Stack trace:

java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
	... 8 more
Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
	... 13 more


> Stream join fails when checkpointing is enabled
> -----------------------------------------------
>
>                 Key: FLINK-6808
>                 URL: https://issues.apache.org/jira/browse/FLINK-6808
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.0
>            Reporter: Francisco Rosa
>             Fix For: 1.3.1
>
>
> The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:
> {code:title=Example|borderStyle=solid}
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         // enable checkpoints
>         env.enableCheckpointing(5000);
>         // create two streams
>         DataStreamSource<Long> one = env.generateSequence(0, 5000);
>         DataStreamSource<Long> two = env.generateSequence(2000, 15000);
>         // process both, provide a delay to make sure checkpoint will happen
>         DataStream<String> oneProcessed = one.
>                 map(oneValue -> {
>                     Thread.sleep(1000);
>                     return "val-" + oneValue;
>                 });
>         DataStream<String> twoProcessed = two.
>                 map(oneValue -> {
>                     Thread.sleep(1000);
>                     return "val-" + oneValue;
>                 });
>         // join the two streams, join on string match
>         DataStream<String> joinedStreams = oneProcessed.
>                 join(twoProcessed).
>                 where(String::toString).
>                 equalTo(String::toString).
>                 window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
>                 apply(new JoinFunction<String, String, String>() {
>                     @Override
>                     public String join(String oneValue, String twoValue) {
>                         // nothing really relevant, just concatenate string
>                         return oneValue + "+" + twoValue;
>                     }
>                 });
>         // output results
>         joinedStreams.print();
>         env.execute("Issue with stream join and checkpoints");
>     }
> {code}
> Stack trace:
> java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
> 	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> 	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> 	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> 	... 8 more
> Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
> 	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
> 	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
> 	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
> 	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
> 	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
> 	... 13 more



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)