You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/09/21 20:02:39 UTC

flink git commit: [FLINK-2713] [streaming] Set state restore to lazy to avoid StateCheckpointer issues and reduce checkpoint overhead

Repository: flink
Updated Branches:
  refs/heads/master b9663c407 -> 63d9800ee


[FLINK-2713] [streaming] Set state restore to lazy to avoid StateCheckpointer issues and reduce checkpoint overhead

Closes #1154


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63d9800e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63d9800e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63d9800e

Branch: refs/heads/master
Commit: 63d9800eee5f99e11f6e9775908a9b62f048c677
Parents: b9663c4
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Sep 21 12:42:15 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Sep 21 20:01:39 2015 +0200

----------------------------------------------------------------------
 .../state/PartitionedStreamOperatorState.java   | 38 +++++++++++++++++--
 .../api/state/StreamOperatorState.java          | 32 +++++++++++++---
 .../api/state/StatefulOperatorTest.java         | 38 +++++++++++--------
 .../PartitionedStateCheckpointingITCase.java    | 40 +++++++++++++++-----
 .../StreamCheckpointingITCase.java              | 16 +++++++-
 5 files changed, 127 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 115a97c..408a0f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -55,6 +55,8 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	private IN currentInput;
 
 	private ClassLoader cl;
+	private boolean restored = true;
+	private StateHandle<Serializable> checkpoint = null;
 
 	public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
 			StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
@@ -76,6 +78,10 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 		if (currentInput == null) {
 			throw new IllegalStateException("Need a valid input for accessing the state.");
 		} else {
+			if (!restored) {
+				// If the state is not restored yet, restore now
+				restoreWithCheckpointer();
+			}
 			Serializable key;
 			try {
 				key = keySelector.getKey(currentInput);
@@ -100,6 +106,10 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 		if (currentInput == null) {
 			throw new IllegalStateException("Need a valid input for updating a state.");
 		} else {
+			if (!restored) {
+				// If the state is not restored yet, restore now
+				restoreWithCheckpointer();
+			}
 			Serializable key;
 			try {
 				key = keySelector.getKey(currentInput);
@@ -131,18 +141,38 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 
 	@Override
 	public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		return stateStore.snapshotStates(checkpointId, checkpointTimestamp);
+		// If the state is restored we take a snapshot, otherwise return the last checkpoint
+		return restored ? stateStore.snapshotStates(checkpointId, checkpointTimestamp) : provider
+				.createStateHandle(checkpoint.getState(cl));
 	}
-
+	
 	@Override
-	public void restoreState(StateHandle<Serializable> snapshots, ClassLoader userCodeClassLoader) throws Exception {
-		stateStore.restoreStates(snapshots, userCodeClassLoader);
+	public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
+		// We store the snapshot for lazy restore
+		checkpoint = snapshot;
+		restored = false;
+	}
+	
+	private void restoreWithCheckpointer() throws IOException {
+		try {
+			stateStore.restoreStates(checkpoint, cl);
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
+		restored = true;
+		checkpoint = null;
 	}
 
 	@Override
 	public Map<Serializable, S> getPartitionedState() throws Exception {
 		return stateStore.getPartitionedState();
 	}
+	
+	@Override
+	public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
+		super.setCheckpointer(checkpointer);
+		stateStore.setCheckPointer(checkpointer);
+	}
 
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 29a19b5..c33b94e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -44,7 +44,10 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 
 	private S state;
 	protected StateCheckpointer<S, C> checkpointer;
-	private final StateHandleProvider<Serializable> provider;
+	protected final StateHandleProvider<Serializable> provider;
+	
+	private boolean restored = true;
+	private Serializable checkpoint = null;
 
 	@SuppressWarnings("unchecked")
 	public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
@@ -59,6 +62,10 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 
 	@Override
 	public S value() throws IOException {
+		if (!restored) {
+			// If the state is not restore it yet, restore at this point
+			restoreWithCheckpointer();
+		}
 		return state;
 	}
 
@@ -67,6 +74,11 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 		if (state == null) {
 			throw new RuntimeException("Cannot set state to null.");
 		}
+		if (!restored) {
+			// If the value is updated before the restore it is overwritten
+			restored = true;
+			checkpoint = false;
+		}
 		this.state = state;
 	}
 	
@@ -90,14 +102,22 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 
 	public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp)
 			throws Exception {
-		return provider.createStateHandle(checkpointer.snapshotState(value(), checkpointId,
-				checkpointTimestamp));
-
+		// If the state is restored we take a snapshot, otherwise return the last checkpoint
+		return provider.createStateHandle(restored ? checkpointer.snapshotState(value(), checkpointId,
+				checkpointTimestamp) : checkpoint);
 	}
 
-	@SuppressWarnings("unchecked")
 	public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
-		update(checkpointer.restoreState((C) snapshot.getState(userCodeClassLoader)));
+		// We set the checkpoint for lazy restore
+		checkpoint = snapshot.getState(userCodeClassLoader);
+		restored = false;
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void restoreWithCheckpointer() throws IOException {
+		update(checkpointer.restoreState((C) checkpoint));
+		restored = true;
+		checkpoint = null;
 	}
 
 	public Map<Serializable, S> getPartitionedState() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 76d9e16..4aec723 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -83,11 +83,14 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 		assertEquals("12345", context.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
 
-		byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
+		byte[] serializedState0 = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
+		// Restore state but snapshot again before calling the value
+		byte[] serializedState = InstantiationUtil.serializeObject(createOperatorWithContext(out,
+				new ModKey(2), serializedState0).getStateSnapshotFromFunction(1, 1));
 
 		StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
 		StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
-
+		
 		assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
 		assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
 		assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
@@ -227,7 +230,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public void open(Configuration conf) throws IOException {
-			counter = getRuntimeContext().getOperatorState("counter", 0, false);
+			counter = getRuntimeContext().getOperatorState("counter", 0, false, intCheckpointer);
 			groupCounter = getRuntimeContext().getOperatorState("groupCounter", new MutableInt(0), true);
 			concat = getRuntimeContext().getOperatorState("concat", "", false);
 			try {
@@ -279,19 +282,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public void open(Configuration conf) throws IOException {
-			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true,
-					new StateCheckpointer<Integer, String>() {
-
-						@Override
-						public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
-							return state.toString();
-						}
-
-						@Override
-						public Integer restoreState(String stateSnapshot) {
-							return Integer.parseInt(stateSnapshot);
-						}
-					});
+			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true, intCheckpointer);
 		}
 
 		@SuppressWarnings("unchecked")
@@ -308,6 +299,21 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 		}
 
 	}
+	
+	public static StateCheckpointer<Integer, String> intCheckpointer = new StateCheckpointer<Integer, String>() {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
+			return state.toString();
+		}
+
+		@Override
+		public Integer restoreState(String stateSnapshot) {
+			return Integer.parseInt(stateSnapshot);
+		}
+	};
 
 	public static class PStateKeyRemovalTestMapper extends RichMapFunction<Integer, String> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index d942a9e..aa3e9e4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Map;
@@ -30,18 +29,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -184,22 +179,47 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 		private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
 
-		private OperatorState<Long> counts;
+		private OperatorState<NonSerializableLong> counts;
 
 		@Override
 		public void open(Configuration parameters) throws IOException {
-			counts = getRuntimeContext().getOperatorState("count", 0L, true);
+			counts = getRuntimeContext().getOperatorState("count", NonSerializableLong.of(0L), true,
+					new StateCheckpointer<NonSerializableLong, String>() {
+
+						@Override
+						public String snapshotState(NonSerializableLong state, long id, long ts) {
+							return state.value.toString();
+						}
+
+						@Override
+						public NonSerializableLong restoreState(String stateSnapshot) {
+							return NonSerializableLong.of(Long.parseLong(stateSnapshot));
+						}
+
+					});
 		}
 
 		@Override
 		public void invoke(Tuple2<Integer, Long> value) throws Exception {
-			long currentCount = counts.value() + 1;
-			counts.update(currentCount);
+			long currentCount = counts.value().value + 1;
+			counts.update(NonSerializableLong.of(currentCount));
 			allCounts.put(value.f0, currentCount);
 
 		}
 	}
 	
+	private static class NonSerializableLong {
+		public Long value;
+
+		private NonSerializableLong(long value) {
+			this.value = value;
+		}
+
+		public static NonSerializableLong of(long value) {
+			return new NonSerializableLong(value);
+		}
+	}
+	
 	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/63d9800e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index d54d425..882634b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -308,7 +309,20 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		
 		@Override
 		public void open(Configuration conf) throws IOException {
-			this.count = getRuntimeContext().getOperatorState("count", 0L, false);
+			this.count = getRuntimeContext().getOperatorState("count", 0L, false,
+					new StateCheckpointer<Long, String>() {
+
+						@Override
+						public String snapshotState(Long state, long id, long ts) {
+							return state.toString();
+						}
+
+						@Override
+						public Long restoreState(String stateSnapshot) {
+							return Long.parseLong(stateSnapshot);
+						}
+
+					});
 		}
 
 		@Override