You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/24 13:33:13 UTC

flink git commit: [streaming] Copy default state values for partitioned states

Repository: flink
Updated Branches:
  refs/heads/master 5d475f887 -> acb6a9366


[streaming] Copy default state values for partitioned states


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acb6a936
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acb6a936
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acb6a936

Branch: refs/heads/master
Commit: acb6a936632297a36e59d30e499626831b3eea82
Parents: 5d475f8
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Jul 24 10:48:49 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Jul 24 10:50:50 2015 +0200

----------------------------------------------------------------------
 .../state/PartitionedStreamOperatorState.java   | 41 +++++++++++++-------
 .../api/state/StreamOperatorState.java          |  2 +-
 .../runtime/tasks/StreamingRuntimeContext.java  |  4 +-
 .../api/state/StatefulOperatorTest.java         | 17 ++++----
 4 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 372cb10..b22aed4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.PartitionedStateStore;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * Implementation of the {@link OperatorState} interface for partitioned user
@@ -49,36 +50,41 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	private final KeySelector<IN, Serializable> keySelector;
 
 	private final PartitionedStateStore<S, C> stateStore;
-	
-	private S defaultState;
+
+	private byte[] defaultState;
 
 	// The currently processed input, used to extract the appropriate key
 	private IN currentInput;
 
+	private ClassLoader cl;
+
 	public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
-			StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector) {
+			StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
 		super(checkpointer, provider);
 		this.keySelector = keySelector;
 		this.stateStore = new EagerStateStore<S, C>(checkpointer, provider);
+		this.cl = cl;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
-			KeySelector<IN, Serializable> keySelector) {
-		this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector);
+			KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
+		this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector, cl);
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
-	public S value() throws IOException{
+	public S value() throws IOException {
 		if (currentInput == null) {
 			throw new IllegalStateException("Need a valid input for accessing the state.");
 		} else {
 			try {
 				Serializable key = keySelector.getKey(currentInput);
-				if(stateStore.containsKey(key)){
+				if (stateStore.containsKey(key)) {
 					return stateStore.getStateForKey(key);
-				}else{
-					return defaultState;
+				} else {
+					return checkpointer.restoreState((C) InstantiationUtil.deserializeObject(
+							defaultState, cl));
 				}
 			} catch (Exception e) {
 				throw new RuntimeException("User-defined key selector threw an exception.", e);
@@ -101,10 +107,15 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 			}
 		}
 	}
-	
+
 	@Override
-	public void setDefaultState(S defaultState){
-		this.defaultState = defaultState;
+	public void setDefaultState(S defaultState) {
+		try {
+			this.defaultState = InstantiationUtil.serializeObject(checkpointer.snapshotState(
+					defaultState, 0, 0));
+		} catch (IOException e) {
+			throw new RuntimeException("Default state must be serializable.");
+		}
 	}
 
 	public void setCurrentInput(IN input) {
@@ -113,7 +124,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 
 	@Override
 	public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
-			long checkpointTimestamp) throws Exception{
+			long checkpointTimestamp) throws Exception {
 		return stateStore.snapshotStates(checkpointId, checkpointTimestamp);
 	}
 
@@ -126,7 +137,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	public Map<Serializable, S> getPartitionedState() throws Exception {
 		return stateStore.getPartitionedState();
 	}
-	
+
 	@Override
 	public String toString() {
 		return stateStore.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index a80d730..2724efb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -45,7 +45,7 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	public static final Serializable DEFAULTKEY = -1;
 
 	private S state;
-	private StateCheckpointer<S, C> checkpointer;
+	protected StateCheckpointer<S, C> checkpointer;
 	private final StateHandleProvider<C> provider;
 
 	public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {

http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 7518124..29671f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -53,6 +53,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	private final List<PartitionedStreamOperatorState> partitionedStates;
 	private final KeySelector<?, ?> statePartitioner;
 	private final StateHandleProvider<Serializable> provider;
+	private final ClassLoader cl;
 
 	@SuppressWarnings("unchecked")
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
@@ -65,6 +66,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 		this.states = new HashMap<String, StreamOperatorState>();
 		this.partitionedStates = new LinkedList<PartitionedStreamOperatorState>();
 		this.provider = (StateHandleProvider<Serializable>) provider;
+		this.cl = userCodeClassLoader;
 	}
 
 	/**
@@ -141,7 +143,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	public StreamOperatorState createRawState(boolean partitioned) {
 		if (partitioned) {
 			if (statePartitioner != null) {
-				return new PartitionedStreamOperatorState(provider, statePartitioner);
+				return new PartitionedStreamOperatorState(provider, statePartitioner, cl);
 			} else {
 				throw new RuntimeException(
 						"A partitioning key must be provided for pastitioned state.");

http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 6e22021..a7a8a09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -76,7 +77,7 @@ public class StatefulOperatorTest {
 
 		assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
 		assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
-		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
+		assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
 		assertEquals("12345", context.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
 
@@ -86,7 +87,7 @@ public class StatefulOperatorTest {
 		StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
 
 		assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
-		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
+		assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
 		assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
 		out.clear();
@@ -95,7 +96,7 @@ public class StatefulOperatorTest {
 
 		assertEquals(Arrays.asList("7", "8"), out);
 		assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
-		assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter")
+		assertEquals(ImmutableMap.of(0, new MutableInt(3), 1, new MutableInt(4)), restoredContext.getOperatorStates().get("groupCounter")
 				.getPartitionedState());
 		assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
@@ -179,7 +180,7 @@ public class StatefulOperatorTest {
 			Checkpointed<Integer> {
 	private static final long serialVersionUID = -9007873655253339356L;
 		OperatorState<Integer> counter;
-		OperatorState<Integer> groupCounter;
+		OperatorState<MutableInt> groupCounter;
 		OperatorState<String> concat;
 		
 		Integer checkpointedCounter = 0;
@@ -187,7 +188,9 @@ public class StatefulOperatorTest {
 		@Override
 		public String map(Integer value) throws Exception {
 			counter.update(counter.value() + 1);
-			groupCounter.update(groupCounter.value() + 1);
+			MutableInt incremented = groupCounter.value();
+			incremented.increment();
+			groupCounter.update(incremented);
 			concat.update(concat.value() + value.toString());
 			checkpointedCounter++;
 			try {
@@ -201,7 +204,7 @@ public class StatefulOperatorTest {
 		@Override
 		public void open(Configuration conf) throws IOException {
 			counter = getRuntimeContext().getOperatorState("counter", 0, false);
-			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
+			groupCounter = getRuntimeContext().getOperatorState("groupCounter", new MutableInt(0), true);
 			concat = getRuntimeContext().getOperatorState("concat", "", false);
 			try {
 				getRuntimeContext().getOperatorState("test", null, true);
@@ -223,7 +226,7 @@ public class StatefulOperatorTest {
 			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
 				Integer key = (Integer) count.getKey();
 				Integer expected = key < 3 ? 2 : 1;
-				assertEquals(expected, count.getValue());
+				assertEquals(new MutableInt(expected), count.getValue());
 			}
 		}