You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2017/06/12 12:48:00 UTC

[jira] [Closed] (FLINK-6791) Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint creation

     [ https://issues.apache.org/jira/browse/FLINK-6791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nico Kruber closed FLINK-6791.
------------------------------
    Resolution: Invalid

unfortunately, I forgot to post the error message I saw back then and probably mixed it up since now, I only receive messages similar to 

{code}
2017-06-12 14:20:25,133 ERROR akka.remote.EndpointWriter                                    - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@localhost:6123/user/jobmanager#454802553]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 13052581 bytes.
{code}

After an {{akka.remote.OversizedPayloadException}}, Flink shows the symptoms described in this issue, and not due to the {{MemoryStateBackend}}'s max size.

> Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint creation
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6791
>                 URL: https://issues.apache.org/jira/browse/FLINK-6791
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0, 1.2.1, 1.4.0
>            Reporter: Nico Kruber
>
> If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. RocksDBStateBackend, it will block further checkpoint/savepoint creation if the checkpoint data reaches the back-end's max state size. In that case, an error message is logged at the task manager but the save-/checkpoint never completes and although the job continues, no further checkpoints will be made.
> Please see the following example that should be reproducible:
> {code:java}
> env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 1024 * 1024, false), false));
> env.enableCheckpointing(100L);
> final long numKeys = 100_000L;
> DataStreamSource<Tuple1<Long>> source1 =
> 		env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() {
> 			private volatile boolean running = true;
> 			@Override
> 			public void run(SourceContext<Tuple1<Long>> ctx) throws Exception {
> 				long counter = 0;
> 				while (running) {
> 					synchronized (ctx.getCheckpointLock()) {
> 						ctx.collect(Tuple1.of(counter % numKeys));
> 						counter++;
> 					}
> 					Thread.yield();
> 				}
> 			}
> 			@Override
> 			public void cancel() {
> 				running = false;
> 			}
> 		});
> source1.keyBy(0)
> 		.map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() {
> 			private transient ValueState<List<Long>> val;
> 			@Override
> 			public Tuple1<Long> map(Tuple1<Long> value)
> 					throws Exception {
> 				val.update(Collections.nCopies(100, value.f0));
> 				return value;
> 			}
> 			@Override
> 			public void open(final Configuration parameters) throws Exception {
> 				ValueStateDescriptor<List<Long>> descriptor =
> 						new ValueStateDescriptor<>(
> 								"data", // the state name
> 								TypeInformation.of(new TypeHint<List<Long>>() {
> 								}) // type information
> 						);
> 				val = getRuntimeContext().getState(descriptor);
> 			}
> 		}).uid("identity-map-with-state")
> 		.addSink(new DiscardingSink<Tuple1<Long>>());
> env.execute("failingsnapshots");
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)