You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/24 13:33:13 UTC
flink git commit: [streaming] Copy default state values for
partitioned states
Repository: flink
Updated Branches:
refs/heads/master 5d475f887 -> acb6a9366
[streaming] Copy default state values for partitioned states
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acb6a936
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acb6a936
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acb6a936
Branch: refs/heads/master
Commit: acb6a936632297a36e59d30e499626831b3eea82
Parents: 5d475f8
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Jul 24 10:48:49 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Jul 24 10:50:50 2015 +0200
----------------------------------------------------------------------
.../state/PartitionedStreamOperatorState.java | 41 +++++++++++++-------
.../api/state/StreamOperatorState.java | 2 +-
.../runtime/tasks/StreamingRuntimeContext.java | 4 +-
.../api/state/StatefulOperatorTest.java | 17 ++++----
4 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 372cb10..b22aed4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.PartitionedStateStore;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.util.InstantiationUtil;
/**
* Implementation of the {@link OperatorState} interface for partitioned user
@@ -49,36 +50,41 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
private final KeySelector<IN, Serializable> keySelector;
private final PartitionedStateStore<S, C> stateStore;
-
- private S defaultState;
+
+ private byte[] defaultState;
// The currently processed input, used to extract the appropriate key
private IN currentInput;
+ private ClassLoader cl;
+
public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
- StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector) {
+ StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
super(checkpointer, provider);
this.keySelector = keySelector;
this.stateStore = new EagerStateStore<S, C>(checkpointer, provider);
+ this.cl = cl;
}
-
+
@SuppressWarnings("unchecked")
public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
- KeySelector<IN, Serializable> keySelector) {
- this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector);
+ KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
+ this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector, cl);
}
+ @SuppressWarnings("unchecked")
@Override
- public S value() throws IOException{
+ public S value() throws IOException {
if (currentInput == null) {
throw new IllegalStateException("Need a valid input for accessing the state.");
} else {
try {
Serializable key = keySelector.getKey(currentInput);
- if(stateStore.containsKey(key)){
+ if (stateStore.containsKey(key)) {
return stateStore.getStateForKey(key);
- }else{
- return defaultState;
+ } else {
+ return checkpointer.restoreState((C) InstantiationUtil.deserializeObject(
+ defaultState, cl));
}
} catch (Exception e) {
throw new RuntimeException("User-defined key selector threw an exception.", e);
@@ -101,10 +107,15 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
}
}
}
-
+
@Override
- public void setDefaultState(S defaultState){
- this.defaultState = defaultState;
+ public void setDefaultState(S defaultState) {
+ try {
+ this.defaultState = InstantiationUtil.serializeObject(checkpointer.snapshotState(
+ defaultState, 0, 0));
+ } catch (IOException e) {
+ throw new RuntimeException("Default state must be serializable.");
+ }
}
public void setCurrentInput(IN input) {
@@ -113,7 +124,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
@Override
public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
- long checkpointTimestamp) throws Exception{
+ long checkpointTimestamp) throws Exception {
return stateStore.snapshotStates(checkpointId, checkpointTimestamp);
}
@@ -126,7 +137,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
public Map<Serializable, S> getPartitionedState() throws Exception {
return stateStore.getPartitionedState();
}
-
+
@Override
public String toString() {
return stateStore.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index a80d730..2724efb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -45,7 +45,7 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
public static final Serializable DEFAULTKEY = -1;
private S state;
- private StateCheckpointer<S, C> checkpointer;
+ protected StateCheckpointer<S, C> checkpointer;
private final StateHandleProvider<C> provider;
public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 7518124..29671f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -53,6 +53,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
private final List<PartitionedStreamOperatorState> partitionedStates;
private final KeySelector<?, ?> statePartitioner;
private final StateHandleProvider<Serializable> provider;
+ private final ClassLoader cl;
@SuppressWarnings("unchecked")
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
@@ -65,6 +66,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
this.states = new HashMap<String, StreamOperatorState>();
this.partitionedStates = new LinkedList<PartitionedStreamOperatorState>();
this.provider = (StateHandleProvider<Serializable>) provider;
+ this.cl = userCodeClassLoader;
}
/**
@@ -141,7 +143,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
public StreamOperatorState createRawState(boolean partitioned) {
if (partitioned) {
if (statePartitioner != null) {
- return new PartitionedStreamOperatorState(provider, statePartitioner);
+ return new PartitionedStreamOperatorState(provider, statePartitioner, cl);
} else {
throw new RuntimeException(
"A partitioning key must be provided for pastitioned state.");
http://git-wip-us.apache.org/repos/asf/flink/blob/acb6a936/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 6e22021..a7a8a09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -76,7 +77,7 @@ public class StatefulOperatorTest {
assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
- assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
+ assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
assertEquals("12345", context.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
@@ -86,7 +87,7 @@ public class StatefulOperatorTest {
StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
- assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
+ assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
out.clear();
@@ -95,7 +96,7 @@ public class StatefulOperatorTest {
assertEquals(Arrays.asList("7", "8"), out);
assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
- assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter")
+ assertEquals(ImmutableMap.of(0, new MutableInt(3), 1, new MutableInt(4)), restoredContext.getOperatorStates().get("groupCounter")
.getPartitionedState());
assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
@@ -179,7 +180,7 @@ public class StatefulOperatorTest {
Checkpointed<Integer> {
private static final long serialVersionUID = -9007873655253339356L;
OperatorState<Integer> counter;
- OperatorState<Integer> groupCounter;
+ OperatorState<MutableInt> groupCounter;
OperatorState<String> concat;
Integer checkpointedCounter = 0;
@@ -187,7 +188,9 @@ public class StatefulOperatorTest {
@Override
public String map(Integer value) throws Exception {
counter.update(counter.value() + 1);
- groupCounter.update(groupCounter.value() + 1);
+ MutableInt incremented = groupCounter.value();
+ incremented.increment();
+ groupCounter.update(incremented);
concat.update(concat.value() + value.toString());
checkpointedCounter++;
try {
@@ -201,7 +204,7 @@ public class StatefulOperatorTest {
@Override
public void open(Configuration conf) throws IOException {
counter = getRuntimeContext().getOperatorState("counter", 0, false);
- groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
+ groupCounter = getRuntimeContext().getOperatorState("groupCounter", new MutableInt(0), true);
concat = getRuntimeContext().getOperatorState("concat", "", false);
try {
getRuntimeContext().getOperatorState("test", null, true);
@@ -223,7 +226,7 @@ public class StatefulOperatorTest {
for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
Integer key = (Integer) count.getKey();
Integer expected = key < 3 ? 2 : 1;
- assertEquals(expected, count.getValue());
+ assertEquals(new MutableInt(expected), count.getValue());
}
}