You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:34 UTC

[07/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
deleted file mode 100644
index c33b94e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Implementation of the {@link OperatorState} interface for non-partitioned
- * user states. It provides methods for checkpointing and restoring operator
- * states upon failure using the provided {@link StateCheckpointer} and
- * {@link StateHandleProvider}.
- * 
- * @param <S>
- *            Type of the underlying {@link OperatorState}.
- * @param <C>
- *            Type of the state snapshot.
- */
-public class StreamOperatorState<S, C extends Serializable> implements OperatorState<S> {
-
-	private S state;
-	protected StateCheckpointer<S, C> checkpointer;
-	protected final StateHandleProvider<Serializable> provider;
-	
-	private boolean restored = true;
-	private Serializable checkpoint = null;
-
-	@SuppressWarnings("unchecked")
-	public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
-		this.checkpointer = checkpointer;
-		this.provider = (StateHandleProvider<Serializable>) provider;
-	}
-	
-	@SuppressWarnings("unchecked")
-	public StreamOperatorState(StateHandleProvider<C> provider) {
-		this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider);
-	}
-
-	@Override
-	public S value() throws IOException {
-		if (!restored) {
-			// If the state is not restore it yet, restore at this point
-			restoreWithCheckpointer();
-		}
-		return state;
-	}
-
-	@Override
-	public void update(S state) throws IOException {
-		if (state == null) {
-			throw new RuntimeException("Cannot set state to null.");
-		}
-		if (!restored) {
-			// If the value is updated before the restore it is overwritten
-			restored = true;
-			checkpoint = false;
-		}
-		this.state = state;
-	}
-	
-	public void setDefaultState(S defaultState) throws IOException {
-		if (value() == null) {
-			update(defaultState);
-		}
-	}
-
-	public StateCheckpointer<S, C> getCheckpointer() {
-		return checkpointer;
-	}
-	
-	public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
-		this.checkpointer = checkpointer;
-	}
-
-	protected StateHandleProvider<Serializable> getStateHandleProvider() {
-		return provider;
-	}
-
-	public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp)
-			throws Exception {
-		// If the state is restored we take a snapshot, otherwise return the last checkpoint
-		return provider.createStateHandle(restored ? checkpointer.snapshotState(value(), checkpointId,
-				checkpointTimestamp) : checkpoint);
-	}
-
-	public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
-		// We set the checkpoint for lazy restore
-		checkpoint = snapshot.getState(userCodeClassLoader);
-		restored = false;
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void restoreWithCheckpointer() throws IOException {
-		update(checkpointer.restoreState((C) checkpoint));
-		restored = true;
-		checkpoint = null;
-	}
-
-	public Map<Serializable, S> getPartitionedState() throws Exception {
-		return ImmutableMap.of((Serializable) 0, state);
-	}
-	
-	@Override
-	public String toString() {
-		return state.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
new file mode 100644
index 0000000..0fa5952
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.InputStream;
+
+/**
+ * A state handle that produces an input stream when resolved.
+ */
+public interface StreamStateHandle extends StateHandle<InputStream> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
deleted file mode 100644
index 9105fd2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * StateHandle that wraps the StateHandles for the operator states of chained
- * tasks. This is needed so the wrapped handles are properly discarded.
- * 
- */
-public class WrapperStateHandle extends LocalStateHandle<Serializable> {
-
-	private static final long serialVersionUID = 1L;
-
-	public WrapperStateHandle(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> state) {
-		super((Serializable) state);
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		@SuppressWarnings("unchecked")
-		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates =
-				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) getState(null); // we can pass "null" here because the LocalStateHandle is not using the ClassLoader anyways
-		for (Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state : chainedStates) {
-			if (state != null) {
-				if (state.f0 != null) {
-					state.f0.discardState();
-				}
-				if (state.f1 != null) {
-					for (StateHandle<Serializable> opState : state.f1.values()) {
-						opState.discardState();
-					}
-				}
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
new file mode 100644
index 0000000..c4a376e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Base class for state that is stored in a file.
+ */
+public abstract class AbstractFileState implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 350284443258002355L;
+	
+	/** The path to the file in the filesystem, fully describing the file system */
+	private final Path filePath;
+
+	/** Cached file system handle */
+	private transient FileSystem fs;
+
+	/**
+	 * Creates a new file state for the given file path.
+	 * 
+	 * @param filePath The path to the file that stores the state.
+	 */
+	protected AbstractFileState(Path filePath) {
+		this.filePath = filePath;
+	}
+
+	/**
+	 * Gets the path where this handle's state is stored.
+	 * @return The path where this handle's state is stored.
+	 */
+	public Path getFilePath() {
+		return filePath;
+	}
+
+	/**
+	 * Discard the state by deleting the file that stores the state. If the parent directory
+	 * of the state is empty after deleting the state file, it is also deleted.
+	 * 
+	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+	 */
+	public void discardState() throws Exception {
+		getFileSystem().delete(filePath, false);
+
+		// send a call to delete the directory containing the file. this will
+		// fail (and be ignored) when some files still exist
+		try {
+			getFileSystem().delete(filePath.getParent(), false);
+		} catch (IOException ignored) {}
+	}
+
+	/**
+	 * Gets the file system that stores the file state.
+	 * @return The file system that stores the file state.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected FileSystem getFileSystem() throws IOException {
+		if (fs == null) {
+			fs = FileSystem.get(filePath.toUri());
+		}
+		return fs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
new file mode 100644
index 0000000..9bf5ec1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+
+/**
+ * A state handle that points to state stored in a file via Java Serialization.
+ * 
+ * @param <T> The type of state pointed to by the state handle.
+ */
+public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> {
+
+	private static final long serialVersionUID = -657631394290213622L;
+	
+	/**
+	 * Creates a new FileSerializableStateHandle pointing to state at the given file path.
+	 * 
+	 * @param filePath The path to the file containing the checkpointed state.
+	 */
+	public FileSerializableStateHandle(Path filePath) {
+		super(filePath);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public T getState(ClassLoader classLoader) throws Exception {
+		FSDataInputStream inStream = getFileSystem().open(getFilePath());
+		ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+		return (T) ois.readObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
new file mode 100644
index 0000000..79512d7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.state.StreamStateHandle;
+
+import java.io.InputStream;
+
+/**
+ * A state handle that points to state in a file system, accessible as an input stream.
+ */
+public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle {
+	
+	private static final long serialVersionUID = -6826990484549987311L;
+
+	/**
+	 * Creates a new FileStreamStateHandle pointing to state at the given file path.
+	 * 
+	 * @param filePath The path to the file containing the checkpointed state.
+	 */
+	public FileStreamStateHandle(Path filePath) {
+		super(filePath);
+	}
+
+	@Override
+	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+		return getFileSystem().open(getFilePath());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
new file mode 100644
index 0000000..107a3be
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.state.AbstractHeapKvState;
+
+import java.io.DataOutputStream;
+import java.util.HashMap;
+
+/**
+ * Heap-backed key/value state that is snapshotted into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBackend> {
+	
+	/** The file system state backend backing snapshots of this state */
+	private final FsStateBackend backend;
+	
+	/**
+	 * Creates a new and empty key/value state.
+	 * 
+	 * @param keySerializer The serializer for the key.
+	 * @param valueSerializer The serializer for the value.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 * @param backend The file system state backend backing snapshots of this state
+	 */
+	public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+							V defaultValue, FsStateBackend backend) {
+		super(keySerializer, valueSerializer, defaultValue);
+		this.backend = backend;
+	}
+
+	/**
+	 * Creates a new key/value state with the given state contents.
+	 * This method is used to re-create key/value state with existing data, for example from
+	 * a snapshot.
+	 * 
+	 * @param keySerializer The serializer for the key.
+	 * @param valueSerializer The serializer for the value.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 * @param state The map of key/value pairs to initialize the state with.
+	 * @param backend The file system state backend backing snapshots of this state
+	 */
+	public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+							V defaultValue, HashMap<K, V> state, FsStateBackend backend) {
+		super(keySerializer, valueSerializer, defaultValue, state);
+		this.backend = backend;
+	}
+
+	
+	@Override
+	public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
+		// first, create an output stream to write to
+		try (FsStateBackend.FsCheckpointStateOutputStream out = 
+					backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
+
+			// serialize the state to the output stream
+			OutputViewDataOutputStreamWrapper outView = 
+					new OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
+			outView.writeInt(size());
+			writeStateToOutputView(outView);
+			outView.flush();
+			
+			// create a handle to the state
+			return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
new file mode 100644
index 0000000..c7117f8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.streaming.api.state.KvStateSnapshot;
+
+import java.io.DataInputStream;
+import java.util.HashMap;
+
+/**
+ * A snapshot of a heap key/value state stored in a file.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <V> The type of the value in the snapshot state.
+ */
+public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements KvStateSnapshot<K, V, FsStateBackend> {
+	
+	private static final long serialVersionUID = 1L;
+
+	/** Name of the key serializer class */
+	private final String keySerializerClassName;
+
+	/** Name of the value serializer class */
+	private final String valueSerializerClassName;
+
+	/**
+	 * Creates a new state snapshot with data in the file system.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param filePath The path where the snapshot data is stored.
+	 */
+	public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) {
+		super(filePath);
+		this.keySerializerClassName = keySerializer.getClass().getName();
+		this.valueSerializerClassName = valueSerializer.getClass().getName();
+	}
+
+	@Override
+	public FsHeapKvState<K, V> restoreState(
+			FsStateBackend stateBackend,
+			final TypeSerializer<K> keySerializer,
+			final TypeSerializer<V> valueSerializer,
+			V defaultValue,
+			ClassLoader classLoader) throws Exception {
+
+		// validity checks
+		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
+				!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
+			throw new IllegalArgumentException(
+					"Cannot restore the state from the snapshot with the given serializers. " +
+							"State (K/V) was serialized with (" + valueSerializerClassName +
+							"/" + keySerializerClassName + ")");
+		}
+		
+		// state restore
+		try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
+			InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
+			
+			final int numEntries = inView.readInt();
+			HashMap<K, V> stateMap = new HashMap<>(numEntries);
+			
+			for (int i = 0; i < numEntries; i++) {
+				K key = keySerializer.deserialize(inView);
+				V value = valueSerializer.deserialize(inView);
+				stateMap.put(key, value);
+			}
+			
+			return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend);
+		}
+		catch (Exception e) {
+			throw new Exception("Failed to restore state from file system", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
new file mode 100644
index 0000000..1fc2457
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.StateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+/**
+ * The file state backend is a state backend that stores the state of streaming jobs in a file system.
+ * 
+ * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
+ * files for each state, for example:
+ * 
+ * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ */
+public class FsStateBackend extends StateBackend<FsStateBackend> {
+
+	private static final long serialVersionUID = -8191916350224044011L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
+	
+	
+	/** The path to the directory for the checkpoint data, including the file system
+	 * description via scheme and optional authority */
+	private final Path basePath;
+	
+	/** The directory (job specific) into this initialized instance of the backend stores its data */
+	private transient Path checkpointDirectory;
+	
+	/** Cached handle to the file system for file operations */
+	private transient FileSystem filesystem;
+
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to teh checkpoint data directory.
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(String checkpointDataUri) throws IOException {
+		this(new Path(checkpointDataUri));
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to teh checkpoint data directory.
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(Path checkpointDataUri) throws IOException {
+		this(checkpointDataUri.toUri());
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 * 
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 * 
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 * 
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to teh checkpoint data directory.
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(URI checkpointDataUri) throws IOException {
+		final String scheme = checkpointDataUri.getScheme();
+		final String path = checkpointDataUri.getPath();
+		
+		// some validity checks
+		if (scheme == null) {
+			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+					"Please specify the file system scheme explicitly in the URI.");
+		}
+		if (path == null) {
+			throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
+					"Please specify a directory path for the checkpoint data.");
+		}
+		if (path.length() == 0 || path.equals("/")) {
+			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+		}
+		
+		// we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
+		// (distributed) filesystem on all hosts and includes full host/port information, even if the
+		// original URI did not include that. We count on the filesystem loading from the configuration
+		// to fill in the missing data.
+		
+		// try to grab the file system for this path/URI
+		this.filesystem = FileSystem.get(checkpointDataUri);
+		if (this.filesystem == null) {
+			throw new IOException("Could not find a file system for the given scheme in the available configurations.");
+		}
+
+		URI fsURI = this.filesystem.getUri();
+		try {
+			URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
+			this.basePath = new Path(baseURI);
+		}
+		catch (URISyntaxException e) {
+			throw new IOException(
+					String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", 
+							checkpointDataUri, fsURI), e);
+		}
+	}
+
+	/**
+	 * Gets the base directory where all state-containing files are stored.
+	 * The job specific directory is created inside this directory.
+	 * 
+	 * @return The base directory.
+	 */
+	public Path getBasePath() {
+		return basePath;
+	}
+
+	/**
+	 * Gets the directory where this state backend stores its checkpoint data. Will be null if
+	 * the state backend has not been initialized.
+	 * 
+	 * @return The directory where this state backend stores its checkpoint data.
+	 */
+	public Path getCheckpointDirectory() {
+		return checkpointDirectory;
+	}
+
+	/**
+	 * Checks whether this state backend is initialized. Note that initialization does not carry
+	 * across serialization. After each serialization, the state backend needs to be initialized.
+	 * 
+	 * @return True, if the file state backend has been initialized, false otherwise.
+	 */
+	public boolean isInitialized() {
+		return filesystem != null && checkpointDirectory != null; 
+	}
+
+	/**
+	 * Gets the file system handle for the file system that stores the state for this backend.
+	 * 
+	 * @return This backend's file system handle.
+	 */
+	public FileSystem getFileSystem() {
+		if (filesystem != null) {
+			return filesystem;
+		}
+		else {
+			throw new IllegalStateException("State backend has not been initialized.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  initialization and cleanup
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void initializeForJob(JobID jobId) throws Exception {
+		Path dir = new Path(basePath, jobId.toString());
+		
+		LOG.info("Initializing file state backend to URI " + dir);
+		
+		filesystem = basePath.getFileSystem();
+		filesystem.mkdirs(dir);
+
+		checkpointDirectory = dir;
+	}
+
+	@Override
+	public void disposeAllStateForCurrentJob() throws Exception {
+		FileSystem fs = this.filesystem;
+		Path dir = this.checkpointDirectory;
+		
+		if (fs != null && dir != null) {
+			this.filesystem = null;
+			this.checkpointDirectory = null;
+			fs.delete(dir, true);
+		}
+		else {
+			throw new IllegalStateException("state backend has not been initialized");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backend operations
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public <K, V> FsHeapKvState<K, V> createKvState(
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
+		return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
+	}
+
+	@Override
+	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+			S state, long checkpointID, long timestamp) throws Exception
+	{
+		checkFileSystemInitialized();
+
+		// make sure the directory for that specific checkpoint exists
+		final Path checkpointDir = createCheckpointDirPath(checkpointID);
+		filesystem.mkdirs(checkpointDir);
+
+		
+		Exception latestException = null;
+
+		for (int attempt = 0; attempt < 10; attempt++) {
+			Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
+			FSDataOutputStream outStream;
+			try {
+				outStream = filesystem.create(targetPath, false);
+			}
+			catch (Exception e) {
+				latestException = e;
+				continue;
+			}
+
+			ObjectOutputStream os = new ObjectOutputStream(outStream);
+			os.writeObject(state);
+			os.close();
+			return new FileSerializableStateHandle<S>(targetPath);
+		}
+		
+		throw new Exception("Could not open output stream for state backend", latestException);
+	}
+	
+	@Override
+	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+		checkFileSystemInitialized();
+		
+		final Path checkpointDir = createCheckpointDirPath(checkpointID);
+		filesystem.mkdirs(checkpointDir);
+		
+		Exception latestException = null;
+		
+		for (int attempt = 0; attempt < 10; attempt++) {
+			Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
+			try {
+				FSDataOutputStream outStream = filesystem.create(targetPath, false);
+				return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
+			}
+			catch (Exception e) {
+				latestException = e;
+			}
+		}
+		throw new Exception("Could not open output stream for state backend", latestException);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private void checkFileSystemInitialized() throws IllegalStateException {
+		if (filesystem == null || checkpointDirectory == null) {
+			throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
+		}
+	}
+	
+	private Path createCheckpointDirPath(long checkpointID) {
+		return new Path(checkpointDirectory, "chk-" + checkpointID);
+	}
+	
+	@Override
+	public String toString() {
+		return checkpointDirectory == null ?
+			"File State Backend @ " + basePath : 
+			"File State Backend (initialized) @ " + checkpointDirectory;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Output stream for state checkpointing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon
+	 * closing.
+	 */
+	public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
+
+		private final FSDataOutputStream outStream;
+		
+		private final Path filePath;
+		
+		private final FileSystem fs;
+		
+		private boolean closed;
+
+		FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) {
+			this.outStream = outStream;
+			this.filePath = filePath;
+			this.fs = fs;
+		}
+
+
+		@Override
+		public void write(int b) throws IOException {
+			outStream.write(b);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			outStream.write(b, off, len);
+		}
+
+		@Override
+		public void flush() throws IOException {
+			outStream.flush();
+		}
+
+		/**
+		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
+		 * feature, for example). This method throws no exception if the deletion fails, but only
+		 * logs the error.
+		 */
+		@Override
+		public void close() {
+			synchronized (this) {
+				if (!closed) {
+					closed = true;
+					try {
+						outStream.close();
+						fs.delete(filePath, false);
+						
+						// attempt to delete the parent (will fail and be ignored if the parent has more files)
+						try {
+							fs.delete(filePath.getParent(), false);
+						} catch (IOException ignored) {}
+					}
+					catch (Exception e) {
+						LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e);
+					}
+				}
+			}
+		}
+
+		@Override
+		public FileStreamStateHandle closeAndGetHandle() throws IOException {
+			return new FileStreamStateHandle(closeAndGetPath());
+		}
+
+		/**
+		 * Closes the stream and returns the path to the file that contains the stream's data.
+		 * @return The path to the file that contains the stream's data.
+		 * @throws IOException Thrown if the stream cannot be successfully closed.
+		 */
+		public Path closeAndGetPath() throws IOException {
+			synchronized (this) {
+				if (!closed) {
+					closed = true;
+					outStream.close();
+					return filePath;
+				}
+				else {
+					throw new IOException("Stream has already been closed and discarded.");
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
new file mode 100644
index 0000000..f0ad6bd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.state.StateBackendFactory;
+
+/**
+ * A factory that creates an {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
+ * from a configuration.
+ */
+public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
+	
+	/** The key under which the config stores the directory where checkpoints should be stored */
+	public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
+	
+	
+	@Override
+	public FsStateBackend createFromConfig(Configuration config) throws Exception {
+		String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+
+		if (checkpointDirURI == null) {
+			throw new IllegalConfigurationException(
+					"Cannot create the file system state backend: The configuration does not specify the " +
+							"checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+		}
+		
+		try {
+			Path path = new Path(checkpointDirURI);
+			return new FsStateBackend(path);
+		}
+		catch (IllegalArgumentException e) {
+			throw new Exception("Cannot initialize File System State Backend with URI '"
+					+ checkpointDirURI + '.', e);
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
new file mode 100644
index 0000000..7952e58
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.memory;
+
+import org.apache.flink.streaming.api.state.StreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * A state handle that contains stream state in a byte array.
+ */
+public final class ByteStreamStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -5280226231200217594L;
+	
+	/** the state data */
+	private final byte[] data;
+
+	/**
+	 * Creates a new ByteStreamStateHandle containing the given data.
+	 * 
+	 * @param data The state data.
+	 */
+	public ByteStreamStateHandle(byte[] data) {
+		this.data = data;
+	}
+
+	@Override
+	public InputStream getState(ClassLoader userCodeClassLoader) {
+		return new ByteArrayInputStream(data);
+	}
+
+	@Override
+	public void discardState() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
new file mode 100644
index 0000000..e611887
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.memory;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.api.state.AbstractHeapKvState;
+
+import java.util.HashMap;
+
+/**
+ * Heap-backed key/value state that is snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateBackend> {
+	
+	public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
+		super(keySerializer, valueSerializer, defaultValue);
+	}
+
+	public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+							V defaultValue, HashMap<K, V> state) {
+		super(keySerializer, valueSerializer, defaultValue, state);
+	}
+	
+	@Override
+	public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
+		DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16));
+		writeStateToOutputView(ser);
+		byte[] bytes = ser.getCopyOfBuffer();
+		
+		return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), getValueSerializer(), bytes, size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
new file mode 100644
index 0000000..7f50379
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.memory;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.streaming.api.state.KvStateSnapshot;
+
+import java.util.HashMap;
+
+/**
+ * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored in a heap byte
+ * array, in serialized form.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <V> The type of the value in the snapshot state.
+ */
+public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, MemoryStateBackend> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	/** Name of the key serializer class */
+	private final String keySerializerClassName;
+
+	/** Name of the value serializer class */
+	private final String valueSerializerClassName;
+	
+	/** The serialized data of the state key/value pairs */
+	private final byte[] data;
+	
+	/** The number of key/value pairs */
+	private final int numEntries;
+
+	/**
+	 * Creates a new heap memory state snapshot.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param data The serialized data of the state key/value pairs
+	 * @param numEntries The number of key/value pairs
+	 */
+	public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer,
+						TypeSerializer<V> valueSerializer, byte[] data, int numEntries) {
+		this.keySerializerClassName = keySerializer.getClass().getName();
+		this.valueSerializerClassName = valueSerializer.getClass().getName();
+		this.data = data;
+		this.numEntries = numEntries;
+	}
+
+
+	@Override
+	public MemHeapKvState<K, V> restoreState(
+			MemoryStateBackend stateBackend,
+			final TypeSerializer<K> keySerializer,
+			final TypeSerializer<V> valueSerializer,
+			V defaultValue,
+			ClassLoader classLoader) throws Exception {
+
+		// validity checks
+		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
+			!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
+				throw new IllegalArgumentException(
+						"Cannot restore the state from the snapshot with the given serializers. " +
+						"State (K/V) was serialized with (" + valueSerializerClassName + 
+						"/" + keySerializerClassName + ")");
+		}
+		
+		// restore state
+		HashMap<K, V> stateMap = new HashMap<>(numEntries);
+		DataInputDeserializer in = new DataInputDeserializer(data, 0, data.length);
+		
+		for (int i = 0; i < numEntries; i++) {
+			K key = keySerializer.deserialize(in);
+			V value = valueSerializer.deserialize(in);
+			stateMap.put(key, value);
+		}
+		
+		return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap);
+	}
+
+	/**
+	 * Discarding the heap state is a no-op.
+	 */
+	@Override
+	public void discardState() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
new file mode 100644
index 0000000..b2dfae8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.memory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.StreamStateHandle;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A {@link StateBackend} that stores all its data and checkpoints in memory and has no
+ * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
+ * transferred 
+ */
+public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
+
+	private static final long serialVersionUID = 4109305377809414635L;
+	
+	/** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
+	private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
+	
+	/** The maximal size that the snapshotted memory state may have */
+	private final int maxStateSize;
+
+	/**
+	 * Creates a new memory state backend that accepts states whose serialized forms are
+	 * up to the default state size (5 MB).
+	 */
+	public MemoryStateBackend() {
+		this(DEFAULT_MAX_STATE_SIZE);
+	}
+
+	/**
+	 * Creates a new memory state backend that accepts states whose serialized forms are
+	 * up to the given number of bytes.
+	 * 
+	 * @param maxStateSize The maximal size of the serialized state
+	 */
+	public MemoryStateBackend(int maxStateSize) {
+		this.maxStateSize = maxStateSize;
+	}
+
+	// ------------------------------------------------------------------------
+	//  initialization and cleanup
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void initializeForJob(JobID job) {
+		// nothing to do here
+	}
+
+	@Override
+	public void disposeAllStateForCurrentJob() {
+		// nothing to do here, GC will do it
+	}
+
+	// ------------------------------------------------------------------------
+	//  State backend operations
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public <K, V> MemHeapKvState<K, V> createKvState(
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
+		return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
+	}
+	
+	/**
+	 * Serialized the given state into bytes using Java serialization and creates a state handle that
+	 * can re-create that state.
+	 * 
+	 * @param state The state to checkpoint.
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @param <S> The type of the state.
+	 * 
+	 * @return A state handle that contains the given state serialized as bytes.
+	 * @throws Exception Thrown, if the serialization fails.
+	 */
+	@Override
+	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+			S state, long checkpointID, long timestamp) throws Exception
+	{
+		SerializedStateHandle<S> handle = new SerializedStateHandle<>(state);
+		checkSize(handle.getSizeOfSerializedState(), maxStateSize);
+		return new SerializedStateHandle<S>(state);
+	}
+
+	@Override
+	public CheckpointStateOutputStream createCheckpointStateOutputStream(
+			long checkpointID, long timestamp) throws Exception
+	{
+		return new MemoryCheckpointOutputStream(maxStateSize);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
+	}
+
+	static void checkSize(int size, int maxSize) throws IOException {
+		if (size > maxSize) {
+			throw new IOException(
+					"Size of the state is larger than the maximum permitted memory-backed state. Size="
+							+ size + " , maxSize=" + maxSize
+							+ " . Consider using a different state backend, like the File System State backend.");
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A CheckpointStateOutputStream that writes into a byte array.
+	 */
+	public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
+		
+		private final ByteArrayOutputStream os = new ByteArrayOutputStream();
+		
+		private final int maxSize;
+		
+		private boolean closed;
+
+		public MemoryCheckpointOutputStream(int maxSize) {
+			this.maxSize = maxSize;
+		}
+
+		@Override
+		public void write(int b) {
+			os.write(b);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) {
+			os.write(b, off, len);
+		}
+
+		// --------------------------------------------------------------------
+
+		@Override
+		public void close() {
+			closed = true;
+			os.reset();
+		}
+
+		@Override
+		public StreamStateHandle closeAndGetHandle() throws IOException {
+			return new ByteStreamStateHandle(closeAndGetBytes());
+		}
+
+		/**
+		 * Closes the stream and returns the byte array containing the stream's data.
+		 * @return The byte array containing the stream's data.
+		 * @throws IOException Thrown if the size of the data exceeds the maximal 
+		 */
+		public byte[] closeAndGetBytes() throws IOException {
+			if (!closed) {
+				checkSize(os.size(), maxSize);
+				byte[] bytes = os.toByteArray();
+				close();
+				return bytes;
+			}
+			else {
+				throw new IllegalStateException("stream has already been closed");
+			}
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Static default instance
+	// ------------------------------------------------------------------------
+	
+	/** The default instance of this state backend, using the default maximal state size */
+	private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend();
+
+	/**
+	 * Gets the default instance of this state backend, using the default maximal state size.
+	 * @return The default instance of this state backend.
+	 */
+	public static MemoryStateBackend defaultInstance() {
+		return DEFAULT_INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
new file mode 100644
index 0000000..163cadd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state.memory;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+
+/**
+ * A state handle that represents its state in serialized form as bytes.
+ *
+ * @param <T> The type of state represented by this state handle.
+ */
+public class SerializedStateHandle<T> extends SerializedValue<T> implements StateHandle<T> {
+	
+	private static final long serialVersionUID = 4145685722538475769L;
+
+	public SerializedStateHandle(T value) throws IOException {
+		super(value);
+	}
+	
+	@Override
+	public T getState(ClassLoader classLoader) throws Exception {
+		return deserializeValue(classLoader);
+	}
+
+	/**
+	 * Discarding heap-memory backed state is a no-op, so this method does nothing.
+	 */
+	@Override
+	public void discardState() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
index 67ccbd6..f9c95f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -110,7 +110,7 @@ public class CoFeedbackTransformation<F> extends StreamTransformation<F> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
index 11a2f33..87c7f16 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import java.util.Collection;
 import java.util.List;
@@ -109,7 +109,7 @@ public class FeedbackTransformation<T> extends StreamTransformation<T> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
index 945d8eb..031c481 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.transformations;
 import com.google.common.collect.Lists;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 
 import java.util.Collection;
 import java.util.List;
@@ -38,10 +38,12 @@ public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
 
 	private final StreamTransformation<IN> input;
 
-	private KeySelector<IN, ?> stateKeySelector;
-
 	private final OneInputStreamOperator<IN, OUT> operator;
 
+	private KeySelector<IN, ?> stateKeySelector;
+	
+	private TypeInformation<?> stateKeyType;
+
 	/**
 	 * Creates a new {@code OneInputTransformation} from the given input and operator.
 	 *
@@ -102,6 +104,14 @@ public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
 		return stateKeySelector;
 	}
 
+	public void setStateKeyType(TypeInformation<?> stateKeyType) {
+		this.stateKeyType = stateKeyType;
+	}
+
+	public TypeInformation<?> getStateKeyType() {
+		return stateKeyType;
+	}
+
 	@Override
 	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
 		List<StreamTransformation<?>> result = Lists.newArrayList();
@@ -111,7 +121,7 @@ public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		operator.setChainingStrategy(strategy);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
index 1165d5d..fa85349 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
@@ -18,7 +18,8 @@
 package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 import java.util.Collection;
@@ -34,6 +35,7 @@ import java.util.List;
  * @param <T> The type of the elements that result from this {@code PartitionTransformation}
  */
 public class PartitionTransformation<T> extends StreamTransformation<T> {
+	
 	private final StreamTransformation<T> input;
 	private final StreamPartitioner<T> partitioner;
 
@@ -74,7 +76,7 @@ public class PartitionTransformation<T> extends StreamTransformation<T> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
index 92033bd..a66b65a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import java.util.Collection;
 import java.util.List;
@@ -35,11 +35,12 @@ import java.util.List;
  * @param <T> The type of the elements that result from this {@code SelectTransformation}
  */
 public class SelectTransformation<T> extends StreamTransformation<T> {
+	
 	private final StreamTransformation<T> input;
-	private List<String> selectedNames;
+	private final List<String> selectedNames;
 
 	/**
-	 * Creates a new {@coe SelectionTransformation} from the given input that only selects
+	 * Creates a new {@code SelectionTransformation} from the given input that only selects
 	 * the streams with the selected names.
 	 *
 	 * @param input The input {@code StreamTransformation}
@@ -76,7 +77,7 @@ public class SelectTransformation<T> extends StreamTransformation<T> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		throw new UnsupportedOperationException("Cannot set chaining strategy on Select Transformation.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
index 2a4e2d0..84ad6db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
@@ -18,9 +18,10 @@
 package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 
 import java.util.Collection;
@@ -39,6 +40,8 @@ public class SinkTransformation<T> extends StreamTransformation<Object> {
 
 	// We need this because sinks can also have state that is partitioned by key
 	private KeySelector<T, ?> stateKeySelector;
+	
+	private TypeInformation<?> stateKeyType;
 
 	/**
 	 * Creates a new {@code SinkTransformation} from the given input {@code StreamTransformation}.
@@ -91,6 +94,14 @@ public class SinkTransformation<T> extends StreamTransformation<Object> {
 		return stateKeySelector;
 	}
 
+	public void setStateKeyType(TypeInformation<?> stateKeyType) {
+		this.stateKeyType = stateKeyType;
+	}
+
+	public TypeInformation<?> getStateKeyType() {
+		return stateKeyType;
+	}
+
 	@Override
 	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
 		List<StreamTransformation<?>> result = Lists.newArrayList();
@@ -100,7 +111,7 @@ public class SinkTransformation<T> extends StreamTransformation<Object> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		operator.setChainingStrategy(strategy);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index c14c58c..9835606 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.transformations;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSource;
 
 import java.util.Collection;
@@ -64,7 +64,7 @@ public class SourceTransformation<T> extends StreamTransformation<T> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		operator.setChainingStrategy(strategy);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
index d392fd5..96c1c9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import java.util.Collection;
 import java.util.List;
@@ -77,7 +77,7 @@ public class SplitTransformation<T> extends StreamTransformation<T> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index dadcfa2..4e6dc42 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -18,11 +18,12 @@
 package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import java.util.Collection;
 
@@ -216,7 +217,7 @@ public abstract class StreamTransformation<T> {
 	/**
 	 * Sets the chaining strategy of this {@code StreamTransformation}.
 	 */
-	public abstract void setChainingStrategy(StreamOperator.ChainingStrategy strategy);
+	public abstract void setChainingStrategy(ChainingStrategy strategy);
 
 	/**
 	 * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
index e7273c5..30f0733 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 
 import java.util.Collection;
@@ -109,7 +109,7 @@ public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		operator.setChainingStrategy(strategy);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
index 4fa3c0a..3e1ff57 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.transformations;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import java.util.Collection;
 import java.util.List;
@@ -74,7 +74,7 @@ public class UnionTransformation<T> extends StreamTransformation<T> {
 	}
 
 	@Override
-	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+	public final void setChainingStrategy(ChainingStrategy strategy) {
 		throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
index 6bb44dd..01e997d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
@@ -25,23 +25,21 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
-import java.util.List;
 
 public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
 
 	private OutputSelectorWrapper<OUT> outputSelectorWrapper;
 
-	private List<Output<OUT>> allOutputs;
+	private ArrayList<Output<StreamRecord<OUT>>> allOutputs;
 
 	public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
 		this.outputSelectorWrapper = outputSelectorWrapper;
-		allOutputs = new ArrayList<Output<OUT>>();
+		allOutputs = new ArrayList<Output<StreamRecord<OUT>>>();
 	}
-
-	@SuppressWarnings("unchecked,rawtypes")
-	public void addCollector(Output<StreamRecord<?>> output, StreamEdge edge) {
+	
+	public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge edge) {
 		outputSelectorWrapper.addCollector(output, edge);
-		allOutputs.add((Output) output);
+		allOutputs.add(output);
 	}
 
 	@Override
@@ -53,13 +51,11 @@ public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
 
 	@Override
 	public void emitWatermark(Watermark mark) {
-		for (Output<OUT> output : allOutputs) {
+		for (Output<?> output : allOutputs) {
 			output.emitWatermark(mark);
 		}
 	}
 
 	@Override
-	public void close() {
-	}
-
+	public void close() {}
 }