You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:12 UTC

[12/14] flink git commit: [FLINK-1638] [streaming] State interface cleanup

[FLINK-1638] [streaming] State interface cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf49ebbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf49ebbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf49ebbb

Branch: refs/heads/master
Commit: cf49ebbb488822c2dea7f59d35dbcc4a85da88c5
Parents: 09aa841
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Mar 5 14:53:11 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/OperatorState.java      |  54 ++---
 .../flink/runtime/state/StateCheckpoint.java    |  27 +--
 .../connectors/kafka/KafkaConsumerExample.java  |   4 +-
 .../kafka/api/simple/PersistentKafkaSource.java |  26 ++-
 .../kafka/api/simple/SimpleKafkaSource.java     |   2 +-
 .../streamvertex/StreamingRuntimeContext.java   |  12 +-
 .../flink/streaming/state/GraphState.java       |  61 ------
 .../apache/flink/streaming/state/MapState.java  | 203 -------------------
 .../streaming/state/PartitionableState.java     |   4 +
 .../flink/streaming/state/SimpleState.java      |  54 -----
 .../state/checkpoint/MapCheckpoint.java         |  73 -------
 .../flink/streaming/state/MapStateTest.java     | 130 ------------
 .../streaming/state/OperatorStateTest.java      |  21 +-
 13 files changed, 59 insertions(+), 612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
index 74ea1a7..8dfd715 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
@@ -23,25 +23,15 @@ import java.io.Serializable;
  * Abstract class for representing operator states in Flink programs. By
  * implementing the methods declared in this abstraction the state of the
  * operator can be checkpointed by the fault tolerance mechanism.
- *
+ * 
  * @param <T>
  *            The type of the operator state.
  */
-public abstract class OperatorState<T> implements Serializable {
+public class OperatorState<T> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public T state;
-
-	/**
-	 * Constructor used for initializing the state. In case of failure, the
-	 * state will be reinitialized using this constructor, then
-	 * {@link #restore(StateCheckpoint)} will be used to restore from the last
-	 * available backup.
-	 */
-	public OperatorState() {
-		state = null;
-	}
+	private T stateObject;
 
 	/**
 	 * Initializes the state using the given state object.
@@ -50,7 +40,7 @@ public abstract class OperatorState<T> implements Serializable {
 	 *            The initial state object
 	 */
 	public OperatorState(T initialState) {
-		state = initialState;
+		stateObject = initialState;
 	}
 
 	/**
@@ -59,45 +49,39 @@ public abstract class OperatorState<T> implements Serializable {
 	 * @return The state.
 	 */
 	public T getState() {
-		return state;
+		return stateObject;
 	}
 
 	/**
-	 * Sets the current state object.
+	 * Updates the current state object. States should be only updated using
+	 * this method to avoid concurrency issues.
 	 * 
-	 * @param state
-	 *            The new state object.
-	 * @return The operator state with the new state object set.
+	 * @param stateUpdate
+	 *            The update applied.
 	 */
-	public OperatorState<T> setState(T state) {
-		this.state = state;
-		return this;
+	@SuppressWarnings("unchecked")
+	public synchronized void update(Object stateUpdate) {
+		this.stateObject = (T) stateUpdate;
 	}
 
 	/**
 	 * Creates a {@link StateCheckpoint} that will be used to backup the state
-	 * for failure recovery.
+	 * for failure recovery. This method will be called by the state
+	 * checkpointer.
 	 * 
 	 * @return The {@link StateCheckpoint} created.
 	 */
-	public abstract StateCheckpoint<T> checkpoint();
-
-	/**
-	 * Restores the state from the given {@link StateCheckpoint}.
-	 * 
-	 * @param checkpoint
-	 *            The checkpoint to restore from
-	 * @return The restored operator.
-	 */
-	public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint);
+	public synchronized StateCheckpoint<T> checkpoint() {
+		return new StateCheckpoint<T>(this);
+	}
 
 	@Override
 	public String toString() {
-		return state.toString();
+		return stateObject.toString();
 	}
 
 	public boolean stateEquals(OperatorState<T> other) {
-		return state.equals(other.state);
+		return stateObject.equals(other.stateObject);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
index 4e4906e..a872e5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
@@ -32,7 +32,7 @@ public class StateCheckpoint<T> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public T checkpointedState;
+	public OperatorState<T> checkpointedState;
 
 	/**
 	 * Creates a state checkpoint from the given {@link OperatorState}
@@ -41,34 +41,13 @@ public class StateCheckpoint<T> implements Serializable {
 	 *            The {@link OperatorState} to checkpoint.
 	 */
 	public StateCheckpoint(OperatorState<T> operatorState) {
-		this.checkpointedState = operatorState.getState();
+		this.checkpointedState = operatorState;
 	}
 
-	public StateCheckpoint() {
-		this.checkpointedState = null;
-	}
-
-	/**
-	 * Returns the state object for the checkpoint.
-	 * 
-	 * @return The checkpointed state object.
-	 */
-	public T getCheckpointedState() {
+	public OperatorState<T> restore() {
 		return checkpointedState;
 	}
 
-	/**
-	 * Updates the checkpoint from next one. Override this method to allow
-	 * incremental updates.
-	 * 
-	 * @param nextCheckpoint
-	 *            The {@link StateCheckpoint} will be used to update from.
-	 */
-	public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
-		this.checkpointedState = nextCheckpoint.getCheckpointedState();
-		return this;
-	}
-
 	@Override
 	public String toString() {
 		return checkpointedState.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index d9b03c9..0a0c623 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
-import org.apache.flink.streaming.state.SimpleState;
 
 public class KafkaConsumerExample {
 
@@ -43,7 +43,7 @@ public class KafkaConsumerExample {
 //						new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
 //						new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
 						new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema()))
-				.registerState("kafka", new SimpleState<Long>())
+				.registerState("kafka", new OperatorState<Long>(null))
 				.setParallelism(3)
 				.print().setParallelism(3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 00d003a..5f15d12 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -18,29 +18,29 @@
 package org.apache.flink.streaming.connectors.kafka.api.simple;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
-import org.apache.flink.streaming.state.SimpleState;
 import org.apache.flink.util.Collector;
 
 public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 
-	private static final long NUM_RECORDS_PER_CHECKPOINT = 1000;
+	private static final long serialVersionUID = 1L;
 
 	private long initialOffset;
 
-	private transient SimpleState<Long> kafkaOffSet;
-	private transient long checkpointCounter;
+	private transient OperatorState<Long> kafkaOffSet;
 
 	/**
 	 * Partition index is set automatically by instance id.
-	 *
+	 * 
 	 * @param topicId
 	 * @param host
 	 * @param port
 	 * @param deserializationSchema
 	 */
-	public PersistentKafkaSource(String topicId, String host, int port, long initialOffset, DeserializationSchema<OUT> deserializationSchema) {
+	public PersistentKafkaSource(String topicId, String host, int port, long initialOffset,
+			DeserializationSchema<OUT> deserializationSchema) {
 		super(topicId, host, port, deserializationSchema);
 		this.initialOffset = initialOffset;
 	}
@@ -48,15 +48,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 	@Override
 	public void open(Configuration parameters) {
 		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		SimpleState<Long> lastKafkaOffSet = (SimpleState<Long>) context.getState("kafka");
+		@SuppressWarnings("unchecked")
+		OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka");
 
-		if (lastKafkaOffSet.getState() == null){
-			kafkaOffSet = new SimpleState<Long>(initialOffset);
+		if (lastKafkaOffSet.getState() == null) {
+			kafkaOffSet = new OperatorState<Long>(initialOffset);
+			context.registerState("kafka", kafkaOffSet);
 		} else {
 			kafkaOffSet = lastKafkaOffSet;
 		}
 
-		checkpointCounter = 0;
 		super.open(parameters);
 	}
 
@@ -78,10 +79,7 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 			gotMessage(msg);
 			OUT out = schema.deserialize(msg.getMessage());
 			collector.collect(out);
-			if (checkpointCounter > NUM_RECORDS_PER_CHECKPOINT){
-				kafkaOffSet = new SimpleState<Long>(msg.getOffset());
-				kafkaOffSet.checkpoint();
-			}
+			kafkaOffSet.update(msg.getOffset());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index 473585c..fa925b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -46,6 +46,7 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
 	}
 
 	private void initializeConnection() {
+		//TODO: Fix this
 		int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
 		iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L);
 	}
@@ -58,7 +59,6 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
 	protected void gotMessage(MessageWithOffset msg) {
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public void run(Collector<OUT> collector) throws Exception {
 		while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index a47100b..60dfe7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -40,8 +40,8 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
 			ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) {
-		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
-				userCodeClassLoader, executionConfig, env.getCopyTask());
+		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
+				executionConfig, env.getCopyTask());
 		this.env = env;
 		this.operatorStates = operatorStates;
 	}
@@ -67,6 +67,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	}
 
+	public void registerState(String name, OperatorState<?> state) {
+		if (state == null) {
+			throw new RuntimeException("Cannot register null state");
+		} else {
+			operatorStates.put(name, state);
+		}
+	}
+
 	/**
 	 * Returns the input split provider associated with the operator.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java
deleted file mode 100644
index 945c0fd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/GraphState.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * GraphState represents a special {@link MapState} for storing graph
- * structures.
- *
- */
-public class GraphState extends MapState<Integer, Set<Integer>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public GraphState() {
-		state = new HashMap<Integer, Set<Integer>>();
-	}
-
-	public void insertDirectedEdge(int sourceNode, int targetNode) {
-		if (!containsKey(sourceNode)) {
-			state.put(sourceNode, new HashSet<Integer>());
-		}
-		state.get(sourceNode).add(targetNode);
-		updatedItems.add(sourceNode);
-		removedItems.remove(sourceNode);
-	}
-
-	public void insertUndirectedEdge(int sourceNode, int targetNode) {
-		if (!state.containsKey(sourceNode)) {
-			state.put(sourceNode, new HashSet<Integer>());
-		}
-		if (!state.containsKey(targetNode)) {
-			state.put(targetNode, new HashSet<Integer>());
-		}
-		state.get(sourceNode).add(targetNode);
-		state.get(targetNode).add(sourceNode);
-
-		updatedItems.add(sourceNode);
-		removedItems.remove(sourceNode);
-		updatedItems.add(targetNode);
-		removedItems.remove(targetNode);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
deleted file mode 100644
index 1b861f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.runtime.state.StateCheckpoint;
-
-
-/**
- * A Map that can be used as a partitionable operator state, for both fault
- * tolerance and rebalancing in Flink programs. The default repartitioner for
- * this operator repartitions by the hashcodes of the keys in the map. </br>
- * </br> The MapState also allows for incremental (data efficient) checkpointing
- * of the state. The entries in the map should only be modified by using the
- * dedicated methods: {@link #put(Object, Object)},{@link #remove(Object)},
- * {@link #putAll(Map)} and {@link #clear}. Directly modifying the entryset
- * will cause errors when checkpointing.
- *
- * @param <K>
- *            The type of the keys.
- * @param <V>
- *            The type of the values.
- */
-public class MapState<K, V> extends PartitionableState<Map<K, V>> implements Map<K, V> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Set<K> removedItems;
-	protected Set<K> updatedItems;
-	protected boolean clear;
-
-	public MapState() {
-		this.state = new HashMap<K, V>();
-		this.updatedItems = new HashSet<K>();
-		this.removedItems = new HashSet<K>();
-	}
-
-	@Override
-	public StateCheckpoint<Map<K, V>> checkpoint() {
-		this.updatedItems.removeAll(removedItems);
-		StateCheckpoint<Map<K, V>> checkpoint = new MapCheckpoint<K, V>(this);
-		resetHistory();
-		return checkpoint;
-	}
-
-	@Override
-	public OperatorState<Map<K, V>> restore(StateCheckpoint<Map<K, V>> checkpoint) {
-		this.state = checkpoint.getCheckpointedState();
-		resetHistory();
-		return this;
-	}
-
-	@Override
-	public OperatorState<Map<K, V>>[] repartition(int numberOfPartitions) {
-		@SuppressWarnings("unchecked")
-		MapState<K, V>[] states = new MapState[numberOfPartitions];
-		for (int i = 0; i < numberOfPartitions; i++) {
-			states[i] = new MapState<K, V>();
-		}
-		for (Entry<K, V> entry : this.state.entrySet()) {
-			int partition = Math.abs(entry.getKey().hashCode() % numberOfPartitions);
-			states[partition].state.put(entry.getKey(), entry.getValue());
-		}
-		return states;
-	}
-
-	@Override
-	public OperatorState<Map<K, V>> reBuild(OperatorState<Map<K, V>>... parts) {
-		clear();
-		for (OperatorState<Map<K, V>> operatorState : parts) {
-			putAll(operatorState.state);
-		}
-		return this;
-	}
-
-	public void resetHistory() {
-		clear = false;
-		removedItems.clear();
-		updatedItems.clear();
-	}
-
-	@Override
-	public int size() {
-		return state.size();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return state.isEmpty();
-	}
-
-	@Override
-	public boolean containsKey(Object key) {
-		return state.containsKey(key);
-	}
-
-	@Override
-	public boolean containsValue(Object value) {
-		return state.containsValue(value);
-	}
-
-	@Override
-	public V get(Object key) {
-		return state.get(key);
-	}
-
-	@Override
-	public V put(K key, V value) {
-		updatedItems.add(key);
-		removedItems.remove(key);
-		return state.put(key, value);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public V remove(Object key) {
-		V removed = state.remove(key);
-		if (removed != null) {
-			removedItems.add((K) key);
-			updatedItems.remove((K) key);
-		}
-
-		return removed;
-	}
-
-	@Override
-	public void putAll(Map<? extends K, ? extends V> m) {
-		for (K key : m.keySet()) {
-			updatedItems.add(key);
-			removedItems.remove(key);
-		}
-		state.putAll(m);
-	}
-
-	@Override
-	public void clear() {
-		clear = true;
-		updatedItems.clear();
-		removedItems.clear();
-		state.clear();
-	}
-
-	@Override
-	public Set<K> keySet() {
-		return state.keySet();
-	}
-
-	@Override
-	public Collection<V> values() {
-		return state.values();
-	}
-
-	@Override
-	public Set<Entry<K, V>> entrySet() {
-		return state.entrySet();
-	}
-
-	@Override
-	public Map<K, V> getState() {
-		throw new RuntimeException(
-				"State object should be accessed using the Map interface provided by the MapState");
-	}
-
-	@Override
-	public OperatorState<Map<K, V>> setState(Map<K, V> state) {
-		throw new RuntimeException(
-				"State object should be accessed using the Map interface provided by the MapState");
-	}
-
-	public Set<K> getRemovedItems() {
-		return removedItems;
-	}
-
-	public Set<K> getUpdatedItems() {
-		return updatedItems;
-	}
-
-	public boolean isCleared() {
-		return clear;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index ddedcd9..a5e67ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -28,6 +28,10 @@ import org.apache.flink.runtime.state.OperatorState;
  */
 public abstract class PartitionableState<T> extends OperatorState<T> {
 
+	public PartitionableState(T initialState) {
+		super(initialState);
+	}
+
 	private static final long serialVersionUID = 1L;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
deleted file mode 100644
index b76f5ac..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-
-/**
- * Basic {@link OperatorState} for storing and updating simple objects. By default the
- * whole stored object is checkpointed at each backup. Override checkpoint to
- * allow a more fine-grained behavior.
- *
- * @param <T>
- *            The type of the stored state object.
- */
-public class SimpleState<T> extends OperatorState<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public SimpleState() {
-		super();
-	}
-
-	public SimpleState(T initialState) {
-		super(initialState);
-	}
-
-	@Override
-	public StateCheckpoint<T> checkpoint() {
-		return new StateCheckpoint<T>(this);
-	}
-
-	@Override
-	public OperatorState<T> restore(StateCheckpoint<T> checkpoint) {
-		this.state = checkpoint.getCheckpointedState();
-		return this;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
deleted file mode 100644
index ee27d4f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state.checkpoint;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.apache.flink.streaming.state.MapState;
-
-public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Set<K> removedItems;
-	protected Map<K, V> updatedItems;
-	protected boolean clear;
-
-	@SuppressWarnings("unchecked")
-	public MapCheckpoint(OperatorState<Map<K, V>> operatorState) {
-		if (operatorState instanceof MapState) {
-			MapState<K, V> mapState = (MapState<K, V>) operatorState;
-
-			this.removedItems = mapState.getRemovedItems();
-			this.clear = mapState.isCleared();
-
-			this.updatedItems = new HashMap<K, V>();
-			for (K key : mapState.getUpdatedItems()) {
-				this.updatedItems.put(key, mapState.get(key));
-			}
-			this.checkpointedState = this.updatedItems;
-
-		} else {
-			throw new RuntimeException("MapCheckpoint can only be used with MapState");
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public StateCheckpoint<Map<K, V>> update(StateCheckpoint<Map<K, V>> nextCheckpoint) {
-		MapCheckpoint<K, V> mapCheckpoint = (MapCheckpoint<K, V>) nextCheckpoint;
-		if (this.checkpointedState == null) {
-			this.checkpointedState = mapCheckpoint.updatedItems;
-		} else {
-			if (mapCheckpoint.clear) {
-				this.checkpointedState.clear();
-			}
-			for (Object key : mapCheckpoint.removedItems) {
-				this.checkpointedState.remove(key);
-			}
-			this.checkpointedState.putAll(mapCheckpoint.updatedItems);
-		}
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
deleted file mode 100644
index 98bafe4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class MapStateTest {
-
-	@Test
-	public void testMapState() {
-
-		Map<String, Integer> map = new HashMap<String, Integer>();
-		map.put("a", 1);
-		map.put("b", 2);
-		map.put("c", 3);
-		map.remove("a");
-
-		MapState<String, Integer> mapState = new MapState<String, Integer>();
-		mapState.put("a", 1);
-		mapState.put("b", 2);
-		mapState.put("c", 3);
-
-		assertEquals(1, (int) mapState.remove("a"));
-		assertEquals(null, mapState.remove("a"));
-		assertEquals(2, mapState.size());
-		assertEquals(map, mapState.state);
-		assertEquals(map.entrySet(), mapState.entrySet());
-		assertTrue(mapState.containsKey("b"));
-		assertFalse(mapState.containsKey("a"));
-
-		assertEquals(2, mapState.updatedItems.size());
-		assertEquals(1, mapState.removedItems.size());
-
-		Map<String, Integer> map2 = new HashMap<String, Integer>();
-		map2.put("a", 0);
-		map2.put("e", -1);
-
-		mapState.putAll(map2);
-
-		assertEquals(4, mapState.updatedItems.size());
-		assertEquals(0, mapState.removedItems.size());
-
-		mapState.clear();
-		assertEquals(new HashMap<String, Integer>(), mapState.state);
-		assertTrue(mapState.clear);
-		assertEquals(0, mapState.updatedItems.size());
-		assertEquals(0, mapState.removedItems.size());
-
-		mapState.putAll(map);
-		assertEquals(map.keySet(), mapState.updatedItems);
-
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testMapStateCheckpointing() {
-
-		Map<String, Integer> map = new HashMap<String, Integer>();
-		map.put("a", 1);
-		map.put("b", 2);
-		map.put("c", 3);
-
-		MapState<String, Integer> mapState = new MapState<String, Integer>();
-		mapState.putAll(map);
-
-		StateCheckpoint<Map<String, Integer>> mcp = mapState.checkpoint();
-		assertEquals(map, mcp.getCheckpointedState());
-
-		Map<String, Integer> map2 = new HashMap<String, Integer>();
-		map2.put("a", 0);
-		map2.put("e", -1);
-
-		mapState.put("a", 0);
-		mapState.put("e", -1);
-		mapState.remove("b");
-		StateCheckpoint<Map<String, Integer>> mcp2 = new MapCheckpoint<String, Integer>(mapState);
-		assertEquals(map2, mcp2.getCheckpointedState());
-		mcp.update(mcp2);
-		assertEquals(mapState.state, mcp.getCheckpointedState());
-
-		mapState.clear();
-		mapState.put("a", 1);
-		mapState.put("a", 2);
-		mapState.put("b", -3);
-		mapState.put("c", 0);
-		mapState.remove("b");
-		mcp.update(mapState.checkpoint());
-		assertEquals(mapState.state, mcp.getCheckpointedState());
-
-		MapState<String, Integer> mapState2 = (MapState<String, Integer>) new MapState<String, Integer>()
-				.restore(mcp);
-
-		assertTrue(mapState2.stateEquals(mapState));
-		mapState2.reBuild(mapState2.repartition(10));
-		assertTrue(mapState2.stateEquals(mapState));
-
-		MapState<Integer, Integer> mapState3 = new MapState<Integer, Integer>();
-		mapState3.put(1, 1);
-		mapState3.put(2, 1);
-
-		mapState3.reBuild(mapState3.repartition(2)[0]);
-		assertTrue(mapState3.containsKey(2));
-		assertFalse(mapState3.containsKey(1));
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf49ebbb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
index 4e07a3f..5b98e8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
@@ -28,22 +28,17 @@ public class OperatorStateTest {
 
 	@Test
 	public void testOperatorState() {
-		OperatorState<Integer> os = new SimpleState<Integer>(5);
-		
+		OperatorState<Integer> os = new OperatorState<Integer>(5);
+
 		StateCheckpoint<Integer> scp = os.checkpoint();
-		
-		assertTrue(os.stateEquals(new SimpleState<Integer>().restore(scp)));
-		
+
+		assertTrue(os.stateEquals(scp.restore()));
+
 		assertEquals((Integer) 5, os.getState());
-		
-		os.setState(10);
-		
+
+		os.update(10);
+
 		assertEquals((Integer) 10, os.getState());
-		
-		scp.update(os.checkpoint());
-		
-		assertEquals((Integer)10,scp.getCheckpointedState());
-		
 	}
 
 }