You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/09/21 20:02:39 UTC
flink git commit: [FLINK-2713] [streaming] Set state restore to lazy
to avoid StateCheckpointer issues and reduce checkpoint overhead
Repository: flink
Updated Branches:
refs/heads/master b9663c407 -> 63d9800ee
[FLINK-2713] [streaming] Set state restore to lazy to avoid StateCheckpointer issues and reduce checkpoint overhead
Closes #1154
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63d9800e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63d9800e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63d9800e
Branch: refs/heads/master
Commit: 63d9800eee5f99e11f6e9775908a9b62f048c677
Parents: b9663c4
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Sep 21 12:42:15 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Sep 21 20:01:39 2015 +0200
----------------------------------------------------------------------
.../state/PartitionedStreamOperatorState.java | 38 +++++++++++++++++--
.../api/state/StreamOperatorState.java | 32 +++++++++++++---
.../api/state/StatefulOperatorTest.java | 38 +++++++++++--------
.../PartitionedStateCheckpointingITCase.java | 40 +++++++++++++++-----
.../StreamCheckpointingITCase.java | 16 +++++++-
5 files changed, 127 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 115a97c..408a0f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -55,6 +55,8 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
private IN currentInput;
private ClassLoader cl;
+ private boolean restored = true;
+ private StateHandle<Serializable> checkpoint = null;
public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
@@ -76,6 +78,10 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
if (currentInput == null) {
throw new IllegalStateException("Need a valid input for accessing the state.");
} else {
+ if (!restored) {
+ // If the state is not restored yet, restore now
+ restoreWithCheckpointer();
+ }
Serializable key;
try {
key = keySelector.getKey(currentInput);
@@ -100,6 +106,10 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
if (currentInput == null) {
throw new IllegalStateException("Need a valid input for updating a state.");
} else {
+ if (!restored) {
+ // If the state is not restored yet, restore now
+ restoreWithCheckpointer();
+ }
Serializable key;
try {
key = keySelector.getKey(currentInput);
@@ -131,18 +141,38 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
@Override
public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return stateStore.snapshotStates(checkpointId, checkpointTimestamp);
+ // If the state is restored we take a snapshot, otherwise return the last checkpoint
+ return restored ? stateStore.snapshotStates(checkpointId, checkpointTimestamp) : provider
+ .createStateHandle(checkpoint.getState(cl));
}
-
+
@Override
- public void restoreState(StateHandle<Serializable> snapshots, ClassLoader userCodeClassLoader) throws Exception {
- stateStore.restoreStates(snapshots, userCodeClassLoader);
+ public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
+ // We store the snapshot for lazy restore
+ checkpoint = snapshot;
+ restored = false;
+ }
+
+ private void restoreWithCheckpointer() throws IOException {
+ try {
+ stateStore.restoreStates(checkpoint, cl);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ restored = true;
+ checkpoint = null;
}
@Override
public Map<Serializable, S> getPartitionedState() throws Exception {
return stateStore.getPartitionedState();
}
+
+ @Override
+ public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
+ super.setCheckpointer(checkpointer);
+ stateStore.setCheckPointer(checkpointer);
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 29a19b5..c33b94e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -44,7 +44,10 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
private S state;
protected StateCheckpointer<S, C> checkpointer;
- private final StateHandleProvider<Serializable> provider;
+ protected final StateHandleProvider<Serializable> provider;
+
+ private boolean restored = true;
+ private Serializable checkpoint = null;
@SuppressWarnings("unchecked")
public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
@@ -59,6 +62,10 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
@Override
public S value() throws IOException {
+ if (!restored) {
+ // If the state is not restore it yet, restore at this point
+ restoreWithCheckpointer();
+ }
return state;
}
@@ -67,6 +74,11 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
if (state == null) {
throw new RuntimeException("Cannot set state to null.");
}
+ if (!restored) {
+ // If the value is updated before the restore it is overwritten
+ restored = true;
+ checkpoint = false;
+ }
this.state = state;
}
@@ -90,14 +102,22 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp)
throws Exception {
- return provider.createStateHandle(checkpointer.snapshotState(value(), checkpointId,
- checkpointTimestamp));
-
+ // If the state is restored we take a snapshot, otherwise return the last checkpoint
+ return provider.createStateHandle(restored ? checkpointer.snapshotState(value(), checkpointId,
+ checkpointTimestamp) : checkpoint);
}
- @SuppressWarnings("unchecked")
public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
- update(checkpointer.restoreState((C) snapshot.getState(userCodeClassLoader)));
+ // We set the checkpoint for lazy restore
+ checkpoint = snapshot.getState(userCodeClassLoader);
+ restored = false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void restoreWithCheckpointer() throws IOException {
+ update(checkpointer.restoreState((C) checkpoint));
+ restored = true;
+ checkpoint = null;
}
public Map<Serializable, S> getPartitionedState() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 76d9e16..4aec723 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -83,11 +83,14 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
assertEquals("12345", context.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
- byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
+ byte[] serializedState0 = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
+ // Restore state but snapshot again before calling the value
+ byte[] serializedState = InstantiationUtil.serializeObject(createOperatorWithContext(out,
+ new ModKey(2), serializedState0).getStateSnapshotFromFunction(1, 1));
StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
-
+
assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
@@ -227,7 +230,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
@Override
public void open(Configuration conf) throws IOException {
- counter = getRuntimeContext().getOperatorState("counter", 0, false);
+ counter = getRuntimeContext().getOperatorState("counter", 0, false, intCheckpointer);
groupCounter = getRuntimeContext().getOperatorState("groupCounter", new MutableInt(0), true);
concat = getRuntimeContext().getOperatorState("concat", "", false);
try {
@@ -279,19 +282,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
@Override
public void open(Configuration conf) throws IOException {
- groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true,
- new StateCheckpointer<Integer, String>() {
-
- @Override
- public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
- return state.toString();
- }
-
- @Override
- public Integer restoreState(String stateSnapshot) {
- return Integer.parseInt(stateSnapshot);
- }
- });
+ groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true, intCheckpointer);
}
@SuppressWarnings("unchecked")
@@ -308,6 +299,21 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
}
}
+
+ public static StateCheckpointer<Integer, String> intCheckpointer = new StateCheckpointer<Integer, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
+ return state.toString();
+ }
+
+ @Override
+ public Integer restoreState(String stateSnapshot) {
+ return Integer.parseInt(stateSnapshot);
+ }
+ };
public static class PStateKeyRemovalTestMapper extends RichMapFunction<Integer, String> {
http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index d942a9e..aa3e9e4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Map;
@@ -30,18 +29,14 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
@@ -184,22 +179,47 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
- private OperatorState<Long> counts;
+ private OperatorState<NonSerializableLong> counts;
@Override
public void open(Configuration parameters) throws IOException {
- counts = getRuntimeContext().getOperatorState("count", 0L, true);
+ counts = getRuntimeContext().getOperatorState("count", NonSerializableLong.of(0L), true,
+ new StateCheckpointer<NonSerializableLong, String>() {
+
+ @Override
+ public String snapshotState(NonSerializableLong state, long id, long ts) {
+ return state.value.toString();
+ }
+
+ @Override
+ public NonSerializableLong restoreState(String stateSnapshot) {
+ return NonSerializableLong.of(Long.parseLong(stateSnapshot));
+ }
+
+ });
}
@Override
public void invoke(Tuple2<Integer, Long> value) throws Exception {
- long currentCount = counts.value() + 1;
- counts.update(currentCount);
+ long currentCount = counts.value().value + 1;
+ counts.update(NonSerializableLong.of(currentCount));
allCounts.put(value.f0, currentCount);
}
}
+ private static class NonSerializableLong {
+ public Long value;
+
+ private NonSerializableLong(long value) {
+ this.value = value;
+ }
+
+ public static NonSerializableLong of(long value) {
+ return new NonSerializableLong(value);
+ }
+ }
+
private static class IdentityKeySelector<T> implements KeySelector<T, T> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index d54d425..882634b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -308,7 +309,20 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@Override
public void open(Configuration conf) throws IOException {
- this.count = getRuntimeContext().getOperatorState("count", 0L, false);
+ this.count = getRuntimeContext().getOperatorState("count", 0L, false,
+ new StateCheckpointer<Long, String>() {
+
+ @Override
+ public String snapshotState(Long state, long id, long ts) {
+ return state.toString();
+ }
+
+ @Override
+ public Long restoreState(String stateSnapshot) {
+ return Long.parseLong(stateSnapshot);
+ }
+
+ });
}
@Override