You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:12 UTC
[12/14] flink git commit: [FLINK-1638] [streaming] State interface
cleanup
[FLINK-1638] [streaming] State interface cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf49ebbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf49ebbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf49ebbb
Branch: refs/heads/master
Commit: cf49ebbb488822c2dea7f59d35dbcc4a85da88c5
Parents: 09aa841
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Mar 5 14:53:11 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/state/OperatorState.java | 54 ++---
.../flink/runtime/state/StateCheckpoint.java | 27 +--
.../connectors/kafka/KafkaConsumerExample.java | 4 +-
.../kafka/api/simple/PersistentKafkaSource.java | 26 ++-
.../kafka/api/simple/SimpleKafkaSource.java | 2 +-
.../streamvertex/StreamingRuntimeContext.java | 12 +-
.../flink/streaming/state/GraphState.java | 61 ------
.../apache/flink/streaming/state/MapState.java | 203 -------------------
.../streaming/state/PartitionableState.java | 4 +
.../flink/streaming/state/SimpleState.java | 54 -----
.../state/checkpoint/MapCheckpoint.java | 73 -------
.../flink/streaming/state/MapStateTest.java | 130 ------------
.../streaming/state/OperatorStateTest.java | 21 +-
13 files changed, 59 insertions(+), 612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
index 74ea1a7..8dfd715 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
@@ -23,25 +23,15 @@ import java.io.Serializable;
* Abstract class for representing operator states in Flink programs. By
* implementing the methods declared in this abstraction the state of the
* operator can be checkpointed by the fault tolerance mechanism.
- *
+ *
* @param <T>
* The type of the operator state.
*/
-public abstract class OperatorState<T> implements Serializable {
+public class OperatorState<T> implements Serializable {
private static final long serialVersionUID = 1L;
- public T state;
-
- /**
- * Constructor used for initializing the state. In case of failure, the
- * state will be reinitialized using this constructor, then
- * {@link #restore(StateCheckpoint)} will be used to restore from the last
- * available backup.
- */
- public OperatorState() {
- state = null;
- }
+ private T stateObject;
/**
* Initializes the state using the given state object.
@@ -50,7 +40,7 @@ public abstract class OperatorState<T> implements Serializable {
* The initial state object
*/
public OperatorState(T initialState) {
- state = initialState;
+ stateObject = initialState;
}
/**
@@ -59,45 +49,39 @@ public abstract class OperatorState<T> implements Serializable {
* @return The state.
*/
public T getState() {
- return state;
+ return stateObject;
}
/**
- * Sets the current state object.
+ * Updates the current state object. States should be only updated using
+ * this method to avoid concurrency issues.
*
- * @param state
- * The new state object.
- * @return The operator state with the new state object set.
+ * @param stateUpdate
+ * The update applied.
*/
- public OperatorState<T> setState(T state) {
- this.state = state;
- return this;
+ @SuppressWarnings("unchecked")
+ public synchronized void update(Object stateUpdate) {
+ this.stateObject = (T) stateUpdate;
}
/**
* Creates a {@link StateCheckpoint} that will be used to backup the state
- * for failure recovery.
+ * for failure recovery. This method will be called by the state
+ * checkpointer.
*
* @return The {@link StateCheckpoint} created.
*/
- public abstract StateCheckpoint<T> checkpoint();
-
- /**
- * Restores the state from the given {@link StateCheckpoint}.
- *
- * @param checkpoint
- * The checkpoint to restore from
- * @return The restored operator.
- */
- public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint);
+ public synchronized StateCheckpoint<T> checkpoint() {
+ return new StateCheckpoint<T>(this);
+ }
@Override
public String toString() {
- return state.toString();
+ return stateObject.toString();
}
public boolean stateEquals(OperatorState<T> other) {
- return state.equals(other.state);
+ return stateObject.equals(other.stateObject);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
index 4e4906e..a872e5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
@@ -32,7 +32,7 @@ public class StateCheckpoint<T> implements Serializable {
private static final long serialVersionUID = 1L;
- public T checkpointedState;
+ public OperatorState<T> checkpointedState;
/**
* Creates a state checkpoint from the given {@link OperatorState}
@@ -41,34 +41,13 @@ public class StateCheckpoint<T> implements Serializable {
* The {@link OperatorState} to checkpoint.
*/
public StateCheckpoint(OperatorState<T> operatorState) {
- this.checkpointedState = operatorState.getState();
+ this.checkpointedState = operatorState;
}
- public StateCheckpoint() {
- this.checkpointedState = null;
- }
-
- /**
- * Returns the state object for the checkpoint.
- *
- * @return The checkpointed state object.
- */
- public T getCheckpointedState() {
+ public OperatorState<T> restore() {
return checkpointedState;
}
- /**
- * Updates the checkpoint from next one. Override this method to allow
- * incremental updates.
- *
- * @param nextCheckpoint
- * The {@link StateCheckpoint} will be used to update from.
- */
- public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
- this.checkpointedState = nextCheckpoint.getCheckpointedState();
- return this;
- }
-
@Override
public String toString() {
return checkpointedState.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index d9b03c9..0a0c623 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -17,11 +17,11 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
-import org.apache.flink.streaming.state.SimpleState;
public class KafkaConsumerExample {
@@ -43,7 +43,7 @@ public class KafkaConsumerExample {
// new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
// new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema()))
- .registerState("kafka", new SimpleState<Long>())
+ .registerState("kafka", new OperatorState<Long>(null))
.setParallelism(3)
.print().setParallelism(3);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 00d003a..5f15d12 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -18,29 +18,29 @@
package org.apache.flink.streaming.connectors.kafka.api.simple;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
-import org.apache.flink.streaming.state.SimpleState;
import org.apache.flink.util.Collector;
public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
- private static final long NUM_RECORDS_PER_CHECKPOINT = 1000;
+ private static final long serialVersionUID = 1L;
private long initialOffset;
- private transient SimpleState<Long> kafkaOffSet;
- private transient long checkpointCounter;
+ private transient OperatorState<Long> kafkaOffSet;
/**
* Partition index is set automatically by instance id.
- *
+ *
* @param topicId
* @param host
* @param port
* @param deserializationSchema
*/
- public PersistentKafkaSource(String topicId, String host, int port, long initialOffset, DeserializationSchema<OUT> deserializationSchema) {
+ public PersistentKafkaSource(String topicId, String host, int port, long initialOffset,
+ DeserializationSchema<OUT> deserializationSchema) {
super(topicId, host, port, deserializationSchema);
this.initialOffset = initialOffset;
}
@@ -48,15 +48,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
@Override
public void open(Configuration parameters) {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- SimpleState<Long> lastKafkaOffSet = (SimpleState<Long>) context.getState("kafka");
+ @SuppressWarnings("unchecked")
+ OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka");
- if (lastKafkaOffSet.getState() == null){
- kafkaOffSet = new SimpleState<Long>(initialOffset);
+ if (lastKafkaOffSet.getState() == null) {
+ kafkaOffSet = new OperatorState<Long>(initialOffset);
+ context.registerState("kafka", kafkaOffSet);
} else {
kafkaOffSet = lastKafkaOffSet;
}
- checkpointCounter = 0;
super.open(parameters);
}
@@ -78,10 +79,7 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
gotMessage(msg);
OUT out = schema.deserialize(msg.getMessage());
collector.collect(out);
- if (checkpointCounter > NUM_RECORDS_PER_CHECKPOINT){
- kafkaOffSet = new SimpleState<Long>(msg.getOffset());
- kafkaOffSet.checkpoint();
- }
+ kafkaOffSet.update(msg.getOffset());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index 473585c..fa925b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -46,6 +46,7 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
}
private void initializeConnection() {
+ //TODO: Fix this
int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L);
}
@@ -58,7 +59,6 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
protected void gotMessage(MessageWithOffset msg) {
}
- @SuppressWarnings("unchecked")
@Override
public void run(Collector<OUT> collector) throws Exception {
while (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index a47100b..60dfe7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -40,8 +40,8 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) {
- super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
- userCodeClassLoader, executionConfig, env.getCopyTask());
+ super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
+ executionConfig, env.getCopyTask());
this.env = env;
this.operatorStates = operatorStates;
}
@@ -67,6 +67,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
}
+ public void registerState(String name, OperatorState<?> state) {
+ if (state == null) {
+ throw new RuntimeException("Cannot register null state");
+ } else {
+ operatorStates.put(name, state);
+ }
+ }
+
/**
* Returns the input split provider associated with the operator.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java
deleted file mode 100644
index 945c0fd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * GraphState represents a special {@link MapState} for storing graph
- * structures.
- *
- */
-public class GraphState extends MapState<Integer, Set<Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- public GraphState() {
- state = new HashMap<Integer, Set<Integer>>();
- }
-
- public void insertDirectedEdge(int sourceNode, int targetNode) {
- if (!containsKey(sourceNode)) {
- state.put(sourceNode, new HashSet<Integer>());
- }
- state.get(sourceNode).add(targetNode);
- updatedItems.add(sourceNode);
- removedItems.remove(sourceNode);
- }
-
- public void insertUndirectedEdge(int sourceNode, int targetNode) {
- if (!state.containsKey(sourceNode)) {
- state.put(sourceNode, new HashSet<Integer>());
- }
- if (!state.containsKey(targetNode)) {
- state.put(targetNode, new HashSet<Integer>());
- }
- state.get(sourceNode).add(targetNode);
- state.get(targetNode).add(sourceNode);
-
- updatedItems.add(sourceNode);
- removedItems.remove(sourceNode);
- updatedItems.add(targetNode);
- removedItems.remove(targetNode);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
deleted file mode 100644
index 1b861f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.runtime.state.StateCheckpoint;
-
-
-/**
- * A Map that can be used as a partitionable operator state, for both fault
- * tolerance and rebalancing in Flink programs. The default repartitioner for
- * this operator repartitions by the hashcodes of the keys in the map. </br>
- * </br> The MapState also allows for incremental (data efficient) checkpointing
- * of the state. The entries in the map should only be modified by using the
- * dedicated methods: {@link #put(Object, Object)},{@link #remove(Object)},
- * {@link #putAll(Map)} and {@link #clear}. Directly modifying the entryset
- * will cause errors when checkpointing.
- *
- * @param <K>
- * The type of the keys.
- * @param <V>
- * The type of the values.
- */
-public class MapState<K, V> extends PartitionableState<Map<K, V>> implements Map<K, V> {
-
- private static final long serialVersionUID = 1L;
-
- protected Set<K> removedItems;
- protected Set<K> updatedItems;
- protected boolean clear;
-
- public MapState() {
- this.state = new HashMap<K, V>();
- this.updatedItems = new HashSet<K>();
- this.removedItems = new HashSet<K>();
- }
-
- @Override
- public StateCheckpoint<Map<K, V>> checkpoint() {
- this.updatedItems.removeAll(removedItems);
- StateCheckpoint<Map<K, V>> checkpoint = new MapCheckpoint<K, V>(this);
- resetHistory();
- return checkpoint;
- }
-
- @Override
- public OperatorState<Map<K, V>> restore(StateCheckpoint<Map<K, V>> checkpoint) {
- this.state = checkpoint.getCheckpointedState();
- resetHistory();
- return this;
- }
-
- @Override
- public OperatorState<Map<K, V>>[] repartition(int numberOfPartitions) {
- @SuppressWarnings("unchecked")
- MapState<K, V>[] states = new MapState[numberOfPartitions];
- for (int i = 0; i < numberOfPartitions; i++) {
- states[i] = new MapState<K, V>();
- }
- for (Entry<K, V> entry : this.state.entrySet()) {
- int partition = Math.abs(entry.getKey().hashCode() % numberOfPartitions);
- states[partition].state.put(entry.getKey(), entry.getValue());
- }
- return states;
- }
-
- @Override
- public OperatorState<Map<K, V>> reBuild(OperatorState<Map<K, V>>... parts) {
- clear();
- for (OperatorState<Map<K, V>> operatorState : parts) {
- putAll(operatorState.state);
- }
- return this;
- }
-
- public void resetHistory() {
- clear = false;
- removedItems.clear();
- updatedItems.clear();
- }
-
- @Override
- public int size() {
- return state.size();
- }
-
- @Override
- public boolean isEmpty() {
- return state.isEmpty();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return state.containsKey(key);
- }
-
- @Override
- public boolean containsValue(Object value) {
- return state.containsValue(value);
- }
-
- @Override
- public V get(Object key) {
- return state.get(key);
- }
-
- @Override
- public V put(K key, V value) {
- updatedItems.add(key);
- removedItems.remove(key);
- return state.put(key, value);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public V remove(Object key) {
- V removed = state.remove(key);
- if (removed != null) {
- removedItems.add((K) key);
- updatedItems.remove((K) key);
- }
-
- return removed;
- }
-
- @Override
- public void putAll(Map<? extends K, ? extends V> m) {
- for (K key : m.keySet()) {
- updatedItems.add(key);
- removedItems.remove(key);
- }
- state.putAll(m);
- }
-
- @Override
- public void clear() {
- clear = true;
- updatedItems.clear();
- removedItems.clear();
- state.clear();
- }
-
- @Override
- public Set<K> keySet() {
- return state.keySet();
- }
-
- @Override
- public Collection<V> values() {
- return state.values();
- }
-
- @Override
- public Set<Entry<K, V>> entrySet() {
- return state.entrySet();
- }
-
- @Override
- public Map<K, V> getState() {
- throw new RuntimeException(
- "State object should be accessed using the Map interface provided by the MapState");
- }
-
- @Override
- public OperatorState<Map<K, V>> setState(Map<K, V> state) {
- throw new RuntimeException(
- "State object should be accessed using the Map interface provided by the MapState");
- }
-
- public Set<K> getRemovedItems() {
- return removedItems;
- }
-
- public Set<K> getUpdatedItems() {
- return updatedItems;
- }
-
- public boolean isCleared() {
- return clear;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index ddedcd9..a5e67ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -28,6 +28,10 @@ import org.apache.flink.runtime.state.OperatorState;
*/
public abstract class PartitionableState<T> extends OperatorState<T> {
+ public PartitionableState(T initialState) {
+ super(initialState);
+ }
+
private static final long serialVersionUID = 1L;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
deleted file mode 100644
index b76f5ac..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-
-/**
- * Basic {@link OperatorState} for storing and updating simple objects. By default the
- * whole stored object is checkpointed at each backup. Override checkpoint to
- * allow a more fine-grained behavior.
- *
- * @param <T>
- * The type of the stored state object.
- */
-public class SimpleState<T> extends OperatorState<T> {
-
- private static final long serialVersionUID = 1L;
-
- public SimpleState() {
- super();
- }
-
- public SimpleState(T initialState) {
- super(initialState);
- }
-
- @Override
- public StateCheckpoint<T> checkpoint() {
- return new StateCheckpoint<T>(this);
- }
-
- @Override
- public OperatorState<T> restore(StateCheckpoint<T> checkpoint) {
- this.state = checkpoint.getCheckpointedState();
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
deleted file mode 100644
index ee27d4f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state.checkpoint;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.apache.flink.streaming.state.MapState;
-
-public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
-
- private static final long serialVersionUID = 1L;
-
- protected Set<K> removedItems;
- protected Map<K, V> updatedItems;
- protected boolean clear;
-
- @SuppressWarnings("unchecked")
- public MapCheckpoint(OperatorState<Map<K, V>> operatorState) {
- if (operatorState instanceof MapState) {
- MapState<K, V> mapState = (MapState<K, V>) operatorState;
-
- this.removedItems = mapState.getRemovedItems();
- this.clear = mapState.isCleared();
-
- this.updatedItems = new HashMap<K, V>();
- for (K key : mapState.getUpdatedItems()) {
- this.updatedItems.put(key, mapState.get(key));
- }
- this.checkpointedState = this.updatedItems;
-
- } else {
- throw new RuntimeException("MapCheckpoint can only be used with MapState");
- }
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public StateCheckpoint<Map<K, V>> update(StateCheckpoint<Map<K, V>> nextCheckpoint) {
- MapCheckpoint<K, V> mapCheckpoint = (MapCheckpoint<K, V>) nextCheckpoint;
- if (this.checkpointedState == null) {
- this.checkpointedState = mapCheckpoint.updatedItems;
- } else {
- if (mapCheckpoint.clear) {
- this.checkpointedState.clear();
- }
- for (Object key : mapCheckpoint.removedItems) {
- this.checkpointedState.remove(key);
- }
- this.checkpointedState.putAll(mapCheckpoint.updatedItems);
- }
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
deleted file mode 100644
index 98bafe4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class MapStateTest {
-
- @Test
- public void testMapState() {
-
- Map<String, Integer> map = new HashMap<String, Integer>();
- map.put("a", 1);
- map.put("b", 2);
- map.put("c", 3);
- map.remove("a");
-
- MapState<String, Integer> mapState = new MapState<String, Integer>();
- mapState.put("a", 1);
- mapState.put("b", 2);
- mapState.put("c", 3);
-
- assertEquals(1, (int) mapState.remove("a"));
- assertEquals(null, mapState.remove("a"));
- assertEquals(2, mapState.size());
- assertEquals(map, mapState.state);
- assertEquals(map.entrySet(), mapState.entrySet());
- assertTrue(mapState.containsKey("b"));
- assertFalse(mapState.containsKey("a"));
-
- assertEquals(2, mapState.updatedItems.size());
- assertEquals(1, mapState.removedItems.size());
-
- Map<String, Integer> map2 = new HashMap<String, Integer>();
- map2.put("a", 0);
- map2.put("e", -1);
-
- mapState.putAll(map2);
-
- assertEquals(4, mapState.updatedItems.size());
- assertEquals(0, mapState.removedItems.size());
-
- mapState.clear();
- assertEquals(new HashMap<String, Integer>(), mapState.state);
- assertTrue(mapState.clear);
- assertEquals(0, mapState.updatedItems.size());
- assertEquals(0, mapState.removedItems.size());
-
- mapState.putAll(map);
- assertEquals(map.keySet(), mapState.updatedItems);
-
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMapStateCheckpointing() {
-
- Map<String, Integer> map = new HashMap<String, Integer>();
- map.put("a", 1);
- map.put("b", 2);
- map.put("c", 3);
-
- MapState<String, Integer> mapState = new MapState<String, Integer>();
- mapState.putAll(map);
-
- StateCheckpoint<Map<String, Integer>> mcp = mapState.checkpoint();
- assertEquals(map, mcp.getCheckpointedState());
-
- Map<String, Integer> map2 = new HashMap<String, Integer>();
- map2.put("a", 0);
- map2.put("e", -1);
-
- mapState.put("a", 0);
- mapState.put("e", -1);
- mapState.remove("b");
- StateCheckpoint<Map<String, Integer>> mcp2 = new MapCheckpoint<String, Integer>(mapState);
- assertEquals(map2, mcp2.getCheckpointedState());
- mcp.update(mcp2);
- assertEquals(mapState.state, mcp.getCheckpointedState());
-
- mapState.clear();
- mapState.put("a", 1);
- mapState.put("a", 2);
- mapState.put("b", -3);
- mapState.put("c", 0);
- mapState.remove("b");
- mcp.update(mapState.checkpoint());
- assertEquals(mapState.state, mcp.getCheckpointedState());
-
- MapState<String, Integer> mapState2 = (MapState<String, Integer>) new MapState<String, Integer>()
- .restore(mcp);
-
- assertTrue(mapState2.stateEquals(mapState));
- mapState2.reBuild(mapState2.repartition(10));
- assertTrue(mapState2.stateEquals(mapState));
-
- MapState<Integer, Integer> mapState3 = new MapState<Integer, Integer>();
- mapState3.put(1, 1);
- mapState3.put(2, 1);
-
- mapState3.reBuild(mapState3.repartition(2)[0]);
- assertTrue(mapState3.containsKey(2));
- assertFalse(mapState3.containsKey(1));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
index 4e07a3f..5b98e8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
@@ -28,22 +28,17 @@ public class OperatorStateTest {
@Test
public void testOperatorState() {
- OperatorState<Integer> os = new SimpleState<Integer>(5);
-
+ OperatorState<Integer> os = new OperatorState<Integer>(5);
+
StateCheckpoint<Integer> scp = os.checkpoint();
-
- assertTrue(os.stateEquals(new SimpleState<Integer>().restore(scp)));
-
+
+ assertTrue(os.stateEquals(scp.restore()));
+
assertEquals((Integer) 5, os.getState());
-
- os.setState(10);
-
+
+ os.update(10);
+
assertEquals((Integer) 10, os.getState());
-
- scp.update(os.checkpoint());
-
- assertEquals((Integer)10,scp.getCheckpointedState());
-
}
}