You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2017/06/01 10:42:04 UTC
[jira] [Created] (FLINK-6791) Using MemoryStateBackend as
checkpoint stream back-end may block checkpoint/savepoint creation
Nico Kruber created FLINK-6791:
----------------------------------
Summary: Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint creation
Key: FLINK-6791
URL: https://issues.apache.org/jira/browse/FLINK-6791
Project: Flink
Issue Type: Bug
Components: State Backends, Checkpointing
Affects Versions: 1.2.1, 1.3.0
Reporter: Nico Kruber
If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. RocksDBStateBackend, it will block further checkpoint/savepoint creation if the checkpoint data reaches the back-end's max state size. In that case, an error message is logged at the task manager but the save-/checkpoint never completes and although the job continues, no further checkpoints will be made.
Please see the following example that should be reproducible:
{code:java}
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 1024 * 1024, false), false));
env.enableCheckpointing(100L);
final long numKeys = 100_000L;
DataStreamSource<Tuple1<Long>> source1 =
env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() {
private volatile boolean running = true;
@Override
public void run(SourceContext<Tuple1<Long>> ctx) throws Exception {
long counter = 0;
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(Tuple1.of(counter % numKeys));
counter++;
}
Thread.yield();
}
}
@Override
public void cancel() {
running = false;
}
});
source1.keyBy(0)
.map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() {
private transient ValueState<List<Long>> val;
@Override
public Tuple1<Long> map(Tuple1<Long> value)
throws Exception {
val.update(Collections.nCopies(100, value.f0));
return value;
}
@Override
public void open(final Configuration parameters) throws Exception {
ValueStateDescriptor<List<Long>> descriptor =
new ValueStateDescriptor<>(
"data", // the state name
TypeInformation.of(new TypeHint<List<Long>>() {
}) // type information
);
val = getRuntimeContext().getState(descriptor);
}
}).uid("identity-map-with-state")
.addSink(new DiscardingSink<Tuple1<Long>>());
env.execute("failingsnapshots");
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)