You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:22 UTC

[31/47] flink git commit: [FLINK-2354] [runtime] Replace old StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
new file mode 100644
index 0000000..481fb98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class FileStateBackendTest {
+	
+	@Test
+	public void testSetupAndSerialization() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			final String backendDir = localFileUri(tempDir);
+			FsStateBackend originalBackend = new FsStateBackend(backendDir);
+			
+			assertFalse(originalBackend.isInitialized());
+			assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
+			assertNull(originalBackend.getCheckpointDirectory());
+			
+			// serialize / copy the backend
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
+			assertFalse(backend.isInitialized());
+			assertEquals(new URI(backendDir), backend.getBasePath().toUri());
+			assertNull(backend.getCheckpointDirectory());
+			
+			// no file operations should be possible right now
+			try {
+				backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+				fail("should fail with an exception");
+			} catch (IllegalStateException e) {
+				// supreme!
+			}
+			
+			backend.initializeForJob(new JobID());
+			assertNotNull(backend.getCheckpointDirectory());
+			
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+			assertTrue(checkpointDir.exists());
+			assertTrue(isDirectoryEmpty(checkpointDir));
+			
+			backend.disposeAllStateForCurrentJob();
+			assertNull(backend.getCheckpointDirectory());
+			
+			assertTrue(isDirectoryEmpty(tempDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+	
+	@Test
+	public void testSerializableState() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+			String state1 = "dummy state";
+			String state2 = "row row row your boat";
+			Integer state3 = 42;
+			
+			StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
+			StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
+			StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
+
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+			handle1.discardState();
+			
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+			handle2.discardState();
+			
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+			handle3.discardState();
+			
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testStateOutputStream() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+			byte[] state1 = new byte[1274673];
+			byte[] state2 = new byte[1];
+			byte[] state3 = new byte[0];
+			byte[] state4 = new byte[177];
+			
+			Random rnd = new Random();
+			rnd.nextBytes(state1);
+			rnd.nextBytes(state2);
+			rnd.nextBytes(state3);
+			rnd.nextBytes(state4);
+
+			long checkpointId = 97231523452L;
+
+			FsStateBackend.FsCheckpointStateOutputStream stream1 = 
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			FsStateBackend.FsCheckpointStateOutputStream stream2 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			FsStateBackend.FsCheckpointStateOutputStream stream3 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			
+			stream1.write(state1);
+			stream2.write(state2);
+			stream3.write(state3);
+			
+			FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
+			FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
+			FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
+			
+			// use with try-with-resources
+			StreamStateHandle handle4;
+			try (StateBackend.CheckpointStateOutputStream stream4 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+				stream4.write(state4);
+				handle4 = stream4.closeAndGetHandle();
+			}
+			
+			// close before accessing handle
+			StateBackend.CheckpointStateOutputStream stream5 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			stream5.write(state4);
+			stream5.close();
+			try {
+				stream5.closeAndGetHandle();
+				fail();
+			} catch (IOException e) {
+				// uh-huh
+			}
+			
+			validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+			handle1.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureLocalFileDeleted(handle1.getFilePath());
+			
+			validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+			handle2.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureLocalFileDeleted(handle2.getFilePath());
+			
+			validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+			handle3.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureLocalFileDeleted(handle3.getFilePath());
+			
+			validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+			handle4.discardState();
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testKeyValueState() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+			KvState<Integer, String, FsStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+			assertEquals(0, kv.size());
+
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertNull(kv.value());
+			kv.update("1");
+			assertEquals(1, kv.size());
+			kv.setCurrentKey(2);
+			assertNull(kv.value());
+			kv.update("2");
+			assertEquals(2, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("1", kv.value());
+			assertEquals(2, kv.size());
+
+			// draw a snapshot
+			KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+
+			// make some more modifications
+			kv.setCurrentKey(1);
+			kv.update("u1");
+			kv.setCurrentKey(2);
+			kv.update("u2");
+			kv.setCurrentKey(3);
+			kv.update("u3");
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
+					kv.shapshot(682375462379L, System.currentTimeMillis());
+
+			// validate the original state
+			assertEquals(3, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("u1", kv.value());
+			kv.setCurrentKey(2);
+			assertEquals("u2", kv.value());
+			kv.setCurrentKey(3);
+			assertEquals("u3", kv.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(2, restored1.size());
+			restored1.setCurrentKey(1);
+			assertEquals("1", restored1.value());
+			restored1.setCurrentKey(2);
+			assertEquals("2", restored1.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(3, restored2.size());
+			restored2.setCurrentKey(1);
+			assertEquals("u1", restored2.value());
+			restored2.setCurrentKey(2);
+			assertEquals("u2", restored2.value());
+			restored2.setCurrentKey(3);
+			assertEquals("u3", restored2.value());
+
+			snapshot1.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+
+			snapshot2.discardState();
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testRestoreWithWrongSerializers() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+			
+			KvState<Integer, String, FsStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+			kv.setCurrentKey(1);
+			kv.update("1");
+			kv.setCurrentKey(2);
+			kv.update("2");
+
+			KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<Integer> fakeIntSerializer =
+					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<String> fakeStringSerializer =
+					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						StringSerializer.INSTANCE, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, IntSerializer.INSTANCE,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+			
+			snapshot.discardState();
+
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private static void ensureLocalFileDeleted(Path path) {
+		URI uri = path.toUri();
+		if ("file".equals(uri.getScheme())) {
+			File file = new File(uri.getPath());
+			assertFalse("file not properly deleted", file.exists());
+		}
+		else {
+			throw new IllegalArgumentException("not a local path");
+		}
+	}
+	
+	private static void deleteDirectorySilently(File dir) {
+		try {
+			FileUtils.deleteDirectory(dir);
+		}
+		catch (IOException ignored) {}
+	}
+	
+	private static boolean isDirectoryEmpty(File directory) {
+		String[] nested = directory.list();
+		return  nested == null || nested.length == 0;
+	}
+	
+	private static String localFileUri(File path) {
+		return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+	}
+	
+	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
+		byte[] holder = new byte[data.length];
+		assertEquals("not enough data", holder.length, is.read(holder));
+		assertEquals("too much data", -1, is.read());
+		assertArrayEquals("wrong data", data, holder);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
new file mode 100644
index 0000000..5f95b33
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
+ */
+public class MemoryStateBackendTest {
+	
+	@Test
+	public void testSerializableState() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+			
+			StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
+			assertNotNull(handle);
+			
+			HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
+			assertEquals(state, restored);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testOversizedState() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend(10);
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+			try {
+				backend.checkpointStateSerializable(state, 12, 459);
+				fail("this should cause an exception");
+			}
+			catch (IOException e) {
+				// now darling, isn't that exactly what we wanted?
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testStateStream() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+			ObjectOutputStream oos = new ObjectOutputStream(os);
+			oos.writeObject(state);
+			oos.flush();
+			StreamStateHandle handle = os.closeAndGetHandle();
+			
+			assertNotNull(handle);
+
+			ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
+			assertEquals(state, ois.readObject());
+			assertTrue(ois.available() <= 0);
+			ois.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testOversizedStateStream() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend(10);
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+			ObjectOutputStream oos = new ObjectOutputStream(os);
+			
+			try {
+				oos.writeObject(state);
+				oos.flush();
+				os.closeAndGetHandle();
+				fail("this should cause an exception");
+			}
+			catch (IOException e) {
+				// oh boy! what an exception!
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testKeyValueState() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+			
+			KvState<Integer, String, MemoryStateBackend> kv = 
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+			
+			assertEquals(0, kv.size());
+			
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertNull(kv.value());
+			kv.update("1");
+			assertEquals(1, kv.size());
+			kv.setCurrentKey(2);
+			assertNull(kv.value());
+			kv.update("2");
+			assertEquals(2, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("1", kv.value());
+			assertEquals(2, kv.size());
+			
+			// draw a snapshot
+			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 = 
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+			
+			// make some more modifications
+			kv.setCurrentKey(1);
+			kv.update("u1");
+			kv.setCurrentKey(2);
+			kv.update("u2");
+			kv.setCurrentKey(3);
+			kv.update("u3");
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
+					kv.shapshot(682375462379L, System.currentTimeMillis());
+			
+			// validate the original state
+			assertEquals(3, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("u1", kv.value());
+			kv.setCurrentKey(2);
+			assertEquals("u2", kv.value());
+			kv.setCurrentKey(3);
+			assertEquals("u3", kv.value());
+			
+			// restore the first snapshot and validate it
+			KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend, 
+							IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(2, restored1.size());
+			restored1.setCurrentKey(1);
+			assertEquals("1", restored1.value());
+			restored1.setCurrentKey(2);
+			assertEquals("2", restored1.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(3, restored2.size());
+			restored2.setCurrentKey(1);
+			assertEquals("u1", restored2.value());
+			restored2.setCurrentKey(2);
+			assertEquals("u2", restored2.value());
+			restored2.setCurrentKey(3);
+			assertEquals("u3", restored2.value());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRestoreWithWrongSerializers() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+			KvState<Integer, String, MemoryStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+			
+			kv.setCurrentKey(1);
+			kv.update("1");
+			kv.setCurrentKey(2);
+			kv.update("2");
+			
+			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<Integer> fakeIntSerializer = 
+					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<String> fakeStringSerializer = 
+					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						StringSerializer.INSTANCE, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, IntSerializer.INSTANCE,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index d2e5b6a..a65ec01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -79,7 +80,7 @@ public class ZooKeeperTestUtils {
 
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-		config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints");
+		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
 		config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index f0130ec..788f70d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -23,7 +23,6 @@ import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.CreateMode;
@@ -83,11 +82,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	 */
 	@Test
 	public void testAdd() throws Exception {
-		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+		LongStateStorage longStateStorage = new LongStateStorage();
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
+				ZooKeeper.getClient(), longStateStorage);
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -98,8 +95,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 
 		// Verify
 		// State handle created
-		assertEquals(1, stateHandleProvider.getStateHandles().size());
-		assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null));
+		assertEquals(1, store.getAll().size());
+		assertEquals(state, store.get(pathInZooKeeper).getState(null));
 
 		// Path created and is persistent
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -120,10 +117,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	 */
 	@Test
 	public void testAddWithCreateMode() throws Exception {
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+		LongStateStorage longStateStorage = new LongStateStorage();
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
+				ZooKeeper.getClient(), longStateStorage);
 
 		// Config
 		Long state = 3457347234L;
@@ -151,8 +147,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 
 			// Verify
 			// State handle created
-			assertEquals(i + 1, stateHandleProvider.getStateHandles().size());
-			assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null));
+			assertEquals(i + 1, store.getAll().size());
+			assertEquals(state, longStateStorage.getStateHandles().get(i).getState(null));
 
 			// Path created
 			Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -182,7 +178,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	 */
 	@Test(expected = Exception.class)
 	public void testAddAlreadyExistingPath() throws Exception {
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -198,7 +194,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testAddDiscardStateHandleAfterFailure() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		CuratorFramework client = spy(ZooKeeper.getClient());
 		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
@@ -231,7 +227,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testReplace() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -270,10 +266,10 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	 */
 	@Test(expected = Exception.class)
 	public void testReplaceNonExistingPath() throws Exception {
-		StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider();
+		StateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider);
+				ZooKeeper.getClient(), stateStorage);
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -284,7 +280,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		CuratorFramework client = spy(ZooKeeper.getClient());
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
@@ -329,7 +325,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testGetAndExists() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -354,7 +350,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	 */
 	@Test(expected = Exception.class)
 	public void testGetNonExistingPath() throws Exception {
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -368,7 +364,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testGetAll() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -399,7 +395,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testGetAllSortedByName() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -429,7 +425,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testRemove() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -453,7 +449,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testRemoveWithCallback() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -492,7 +488,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testRemoveAndDiscardState() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -514,7 +510,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	@Test
 	public void testRemoveAndDiscardAllState() throws Exception {
 		// Setup
-		LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 				ZooKeeper.getClient(), stateHandleProvider);
@@ -543,21 +539,19 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 	// Simple test helpers
 	// ---------------------------------------------------------------------------------------------
 
-	private static class LongStateHandleProvider implements StateHandleProvider<Long> {
-
-		private static final long serialVersionUID = 4572084854499402276L;
+	private static class LongStateStorage implements StateStorageHelper<Long> {
 
 		private final List<LongStateHandle> stateHandles = new ArrayList<>();
 
 		@Override
-		public StateHandle<Long> createStateHandle(Long state) {
+		public StateHandle<Long> store(Long state) throws Exception {
 			LongStateHandle stateHandle = new LongStateHandle(state);
 			stateHandles.add(stateHandle);
 
 			return stateHandle;
 		}
 
-		public List<LongStateHandle> getStateHandles() {
+		List<LongStateHandle> getStateHandles() {
 			return stateHandles;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 1ca02aa..f77ed07 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')
@@ -36,3 +36,4 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+log4j.logger.org.apache.flink.runtime.blob=DEBUG

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 8b7fb1c..4e4acd2 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 26e1c9e..98506e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
@@ -372,11 +372,11 @@ public abstract class StreamExecutionEnvironment {
 	 * the key/value state, and for checkpointed functions (implementing the interface
 	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
 	 *
-	 * <p>The {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example
+	 * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
 	 * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
 	 * but can checkpoint only small states (some counters).
 	 * 
-	 * <p>In contrast, the {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
+	 * <p>In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
 	 * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
 	 * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
 	 * failures of individual nodes and that streaming program can be executed highly available and strongly

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
index 3817ede..3ac63af 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
@@ -26,8 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 76be598..11bf84f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
 import org.apache.flink.util.InstantiationUtil;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 0652406..be020d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9e60e9a..078679d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.state.KvState;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index a991fd3..17bd08d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
deleted file mode 100644
index b974674..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Base class for key/value state implementations that are backed by a regular heap hash map. The
- * concrete implementations define how the state is checkpointed.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- * @param <Backend> The type of the backend that snapshots this key/value state.
- */
-public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> {
-
-	/** Map containing the actual key/value pairs */
-	private final HashMap<K, V> state;
-	
-	/** The serializer for the keys */
-	private final TypeSerializer<K> keySerializer;
-
-	/** The serializer for the values */
-	private final TypeSerializer<V> valueSerializer;
-	
-	/** The value that is returned when no other value has been associated with a key, yet */
-	private final V defaultValue;
-	
-	/** The current key, which the next value methods will refer to */
-	private K currentKey;
-	
-	/**
-	 * Creates a new empty key/value state.
-	 * 
-	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 */
-	protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
-									TypeSerializer<V> valueSerializer,
-									V defaultValue) {
-		this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>());
-	}
-
-	/**
-	 * Creates a new key/value state for the given hash map of key/value pairs.
-	 * 
-	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param state The state map to use in this kev/value state. May contain initial state.   
-	 */
-	protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
-									TypeSerializer<V> valueSerializer,
-									V defaultValue,
-									HashMap<K, V> state) {
-		this.state = requireNonNull(state);
-		this.keySerializer = requireNonNull(keySerializer);
-		this.valueSerializer = requireNonNull(valueSerializer);
-		this.defaultValue = defaultValue;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public V value() {
-		V value = state.get(currentKey);
-		return value != null ? value : defaultValue;
-	}
-
-	@Override
-	public void update(V value) {
-		if (value != null) {
-			state.put(currentKey, value);
-		}
-		else {
-			state.remove(currentKey);
-		}
-	}
-
-	@Override
-	public void setCurrentKey(K currentKey) {
-		this.currentKey = currentKey;
-	}
-
-	@Override
-	public int size() {
-		return state.size();
-	}
-
-	@Override
-	public void dispose() {
-		state.clear();
-	}
-
-	/**
-	 * Gets the serializer for the keys.
-	 * @return The serializer for the keys.
-	 */
-	public TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
-	}
-
-	/**
-	 * Gets the serializer for the values.
-	 * @return The serializer for the values.
-	 */
-	public TypeSerializer<V> getValueSerializer() {
-		return valueSerializer;
-	}
-
-	// ------------------------------------------------------------------------
-	//  checkpointing utilities
-	// ------------------------------------------------------------------------
-	
-	protected void writeStateToOutputView(final DataOutputView out) throws IOException {
-		for (Map.Entry<K, V> entry : state.entrySet()) {
-			keySerializer.serialize(entry.getKey(), out);
-			valueSerializer.serialize(entry.getValue(), out);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
deleted file mode 100644
index 9c628f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.state.OperatorState;
-
-/**
- * Key/Value state implementation for user-defined state. The state is backed by a state
- * backend, which typically follows one of the following patterns: Either the state is stored
- * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
- * state backend into some store (during checkpoints), or the key/value state is in fact backed
- * by an external key/value store as the state backend, and checkpoints merely record the
- * metadata of what is considered part of the checkpoint.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> {
-
-	/**
-	 * Sets the current key, which will be used to retrieve values for the next calls to
-	 * {@link #value()} and {@link #update(Object)}.
-	 * 
-	 * @param key The key.
-	 */
-	void setCurrentKey(K key);
-
-	/**
-	 * Creates a snapshot of this state.
-	 * 
-	 * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return A snapshot handle for this key/value state.
-	 * 
-	 * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
-	 *                   can react to failed snapshots.
-	 */
-	KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception;
-
-	/**
-	 * Gets the number of key/value pairs currently stored in the state. Note that is a key
-	 * has been associated with "null", the key is removed from the state an will not
-	 * be counted here.
-	 *
-	 * @return The number of key/value pairs currently stored in the state.
-	 */
-	int size();
-
-	/**
-	 * Disposes the key/value state, releasing all occupied resources.
-	 */
-	void dispose();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
deleted file mode 100644
index 6aa7a1e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly
- * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends
- * on the actual implementation. This snapshot defines merely how to restore the state and
- * how to discard the state.
- *
- * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map.
- * 
- * <p>Another possible implementation for this snapshot is that the key/value map is serialized into
- * a file and this snapshot object contains a pointer to that file.
- *
- * @param <K> The type of the key
- * @param <V> The type of the value
- * @param <Backend> The type of the backend that can restore the state from this snapshot.
- */
-public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable {
-
-	/**
-	 * Loads the key/value state back from this snapshot.
-	 * 
-	 * 
-	 * @param stateBackend The state backend that created this snapshot and can restore the key/value state
-	 *                     from this snapshot.
-	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.   
-	 * @param classLoader The class loader for user-defined types.
-	 * 
-	 * @return An instance of the key/value state loaded from this snapshot.
-	 * 
-	 * @throws Exception Exceptions can occur during the state loading and are forwarded. 
-	 */
-	KvState<K, V, Backend> restoreState(
-			Backend stateBackend,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<V> valueSerializer,
-			V defaultValue,
-			ClassLoader classLoader) throws Exception;
-
-
-	/**
-	 * Discards the state snapshot, removing any resources occupied by it.
-	 * 
-	 * @throws Exception Exceptions occurring during the state disposal should be forwarded.
-	 */
-	void discardState() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
deleted file mode 100644
index 2bbb4e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This class represents serialized checkpoint data for a collection of elements.
- */
-public class SerializedCheckpointData implements java.io.Serializable {
-
-	private static final long serialVersionUID = -8783744683896503488L;
-	
-	/** ID of the checkpoint for which the IDs are stored */
-	private final long checkpointId;
-
-	/** The serialized elements */
-	private final byte[] serializedData;
-
-	/** The number of elements in the checkpoint */
-	private final int numIds;
-
-	/**
-	 * Creates a SerializedCheckpointData object for the given serialized data.
-	 * 
-	 * @param checkpointId The checkpointId of the checkpoint.
-	 * @param serializedData The serialized IDs in this checkpoint.
-	 * @param numIds The number of IDs in the checkpoint.
-	 */
-	public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
-		this.checkpointId = checkpointId;
-		this.serializedData = serializedData;
-		this.numIds = numIds;
-	}
-
-	/**
-	 * Gets the checkpointId of the checkpoint.
-	 * @return The checkpointId of the checkpoint.
-	 */
-	public long getCheckpointId() {
-		return checkpointId;
-	}
-
-	/**
-	 * Gets the binary data for the serialized elements.
-	 * @return The binary data for the serialized elements.
-	 */
-	public byte[] getSerializedData() {
-		return serializedData;
-	}
-
-	/**
-	 * Gets the number of IDs in the checkpoint.
-	 * @return The number of IDs in the checkpoint.
-	 */
-	public int getNumIds() {
-		return numIds;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Serialize to Checkpoint
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
-	 * 
-	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
-	 * @param serializer The serializer to serialize the IDs.
-	 * @param <T> The type of the ID.
-	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
-	 * 
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
-												TypeSerializer<T> serializer) throws IOException {
-		return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
-	}
-
-	/**
-	 * Converts a list of checkpoints into an array of SerializedCheckpointData.
-	 *
-	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
-	 * @param serializer The serializer to serialize the IDs.
-	 * @param outputBuffer The reusable serialization buffer.
-	 * @param <T> The type of the ID.
-	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
-	 *
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
-												TypeSerializer<T> serializer,
-												DataOutputSerializer outputBuffer) throws IOException {
-		SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
-		
-		int pos = 0;
-		for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
-			outputBuffer.clear();
-			List<T> checkpointIds = checkpoint.f1;
-			
-			for (T id : checkpointIds) {
-				serializer.serialize(id, outputBuffer);
-			}
-
-			serializedCheckpoints[pos++] = new SerializedCheckpointData(
-					checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
-		}
-		
-		return serializedCheckpoints;
-	}
-
-	// ------------------------------------------------------------------------
-	//  De-Serialize from Checkpoint
-	// ------------------------------------------------------------------------
-
-	/**
-	 * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
-	 * 
-	 * @param data The data to be deserialized.
-	 * @param serializer The serializer used to deserialize the data.
-	 * @param <T> The type of the elements.
-	 * @return An ArrayDeque of element checkpoints.
-	 * 
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
-			SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
-	{
-		ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length);
-		DataInputDeserializer deser = null;
-		
-		for (SerializedCheckpointData checkpoint : data) {
-			byte[] serializedData = checkpoint.getSerializedData();
-			if (deser == null) {
-				deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
-			}
-			else {
-				deser.setBuffer(serializedData, 0, serializedData.length);
-			}
-			
-			final List<T> ids = new ArrayList<>(checkpoint.getNumIds());
-			final int numIds = checkpoint.getNumIds();
-			
-			for (int i = 0; i < numIds; i++) {
-				ids.add(serializer.deserialize(deser));
-			}
-
-			deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids));
-		}
-		
-		return deque;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
deleted file mode 100644
index f4391ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A state backend defines how state is stored and snapshotted during checkpoints.
- * 
- * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
- *                  type of backend when creating state backed by this backend.
- */
-public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 4620413814639220247L;
-	
-	// ------------------------------------------------------------------------
-	//  initialization and cleanup
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * This method is called by the task upon deployment to initialize the state backend for
-	 * data for a specific job.
-	 * 
-	 * @param job The ID of the job for which the state backend instance checkpoints data.
-	 * @throws Exception Overwritten versions of this method may throw exceptions, in which
-	 *                   case the job that uses the state backend is considered failed during
-	 *                   deployment.
-	 */
-	public abstract void initializeForJob(JobID job) throws Exception;
-
-	/**
-	 * Disposes all state associated with the current job.
-	 * 
-	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
-	 */
-	public abstract void disposeAllStateForCurrentJob() throws Exception;
-
-	/**
-	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
-	 * checkpoint data.
-	 * 
-	 * @throws Exception Exceptions can be forwarded and will be logged by the system
-	 */
-	public abstract void close() throws Exception;
-	
-	// ------------------------------------------------------------------------
-	//  key/value state
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a key/value state backed by this state backend.
-	 * 
-	 * @param keySerializer The serializer for the key.
-	 * @param valueSerializer The serializer for the value.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param <K> The type of the key.
-	 * @param <V> The type of the value.
-	 * 
-	 * @return A new key/value state backed by this backend.
-	 * 
-	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
-	 */
-	public abstract <K, V> KvState<K, V, Backend> createKvState(
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-			V defaultValue) throws Exception;
-	
-	
-	// ------------------------------------------------------------------------
-	//  storing state for a checkpoint
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates an output stream that writes into the state of the given checkpoint. When the stream
-	 * is closes, it returns a state handle that can retrieve the state back.
-	 * 
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return An output stream that writes state for the given checkpoint.
-	 * 
-	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
-	 */
-	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
-			long checkpointID, long timestamp) throws Exception;
-	
-	/**
-	 * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
-	 * When the stream is closes, it returns a state handle that can retrieve the state back.
-	 *
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return An DataOutputView stream that writes state for the given checkpoint.
-	 *
-	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
-	 */
-	public CheckpointStateOutputView createCheckpointStateOutputView(
-			long checkpointID, long timestamp) throws Exception {
-		return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
-	}
-
-	/**
-	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
-	 * 
-	 * @param state The state to be checkpointed.
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @param <S> The type of the state.
-	 * 
-	 * @return A state handle that can retrieve the checkpoined state.
-	 * 
-	 * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
-	 */
-	public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception;
-	
-	
-	// ------------------------------------------------------------------------
-	//  Checkpoint state output stream
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
-	 */
-	public static abstract class CheckpointStateOutputStream extends OutputStream {
-
-		/**
-		 * Closes the stream and gets a state handle that can create an input stream
-		 * producing the data written to this stream.
-		 * 
-		 * @return A state handle that can create an input stream producing the data written to this stream.
-		 * @throws IOException Thrown, if the stream cannot be closed.
-		 */
-		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
-	}
-
-	/**
-	 * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
-	 */
-	public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-		
-		private final CheckpointStateOutputStream out;
-		
-		public CheckpointStateOutputView(CheckpointStateOutputStream out) {
-			super(out);
-			this.out = out;
-		}
-
-		/**
-		 * Closes the stream and gets a state handle that can create a DataInputView.
-		 * producing the data written to this stream.
-		 *
-		 * @return A state handle that can create an input stream producing the data written to this stream.
-		 * @throws IOException Thrown, if the stream cannot be closed.
-		 */
-		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
-			return new DataInputViewHandle(out.closeAndGetHandle());
-		}
-
-		@Override
-		public void close() throws IOException {
-			out.close();
-		}
-	}
-
-	/**
-	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
-	 */
-	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
-
-		private static final long serialVersionUID = 2891559813513532079L;
-		
-		private final StreamStateHandle stream;
-
-		private DataInputViewHandle(StreamStateHandle stream) {
-			this.stream = stream;
-		}
-
-		@Override
-		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
-			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); 
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			stream.discardState();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
deleted file mode 100644
index ad87eae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.configuration.Configuration;
-
-/**
- * A factory to create a specific state backend. The state backend creation gets a Configuration
- * object that can be used to read further config values.
- * 
- * @param <T> The type of the state backend created.
- */
-public interface StateBackendFactory<T extends StateBackend<T>> {
-
-	/**
-	 * Creates the state backend, optionally using the given configuration.
-	 * 
-	 * @param config The Flink configuration (loaded by the TaskManager).
-	 * @return The created state backend. 
-	 * 
-	 * @throws Exception Exceptions during instantiation can be forwarded.
-	 */
-	StateBackend<T> createFromConfig(Configuration config) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
deleted file mode 100644
index 0fa5952..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.InputStream;
-
-/**
- * A state handle that produces an input stream when resolved.
- */
-public interface StreamStateHandle extends StateHandle<InputStream> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
deleted file mode 100644
index c4a376e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-/**
- * Base class for state that is stored in a file.
- */
-public abstract class AbstractFileState implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 350284443258002355L;
-	
-	/** The path to the file in the filesystem, fully describing the file system */
-	private final Path filePath;
-
-	/** Cached file system handle */
-	private transient FileSystem fs;
-
-	/**
-	 * Creates a new file state for the given file path.
-	 * 
-	 * @param filePath The path to the file that stores the state.
-	 */
-	protected AbstractFileState(Path filePath) {
-		this.filePath = filePath;
-	}
-
-	/**
-	 * Gets the path where this handle's state is stored.
-	 * @return The path where this handle's state is stored.
-	 */
-	public Path getFilePath() {
-		return filePath;
-	}
-
-	/**
-	 * Discard the state by deleting the file that stores the state. If the parent directory
-	 * of the state is empty after deleting the state file, it is also deleted.
-	 * 
-	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
-	 */
-	public void discardState() throws Exception {
-		getFileSystem().delete(filePath, false);
-
-		// send a call to delete the directory containing the file. this will
-		// fail (and be ignored) when some files still exist
-		try {
-			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {}
-	}
-
-	/**
-	 * Gets the file system that stores the file state.
-	 * @return The file system that stores the file state.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	protected FileSystem getFileSystem() throws IOException {
-		if (fs == null) {
-			fs = FileSystem.get(filePath.toUri());
-		}
-		return fs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
deleted file mode 100644
index 9bf5ec1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.ObjectInputStream;
-
-/**
- * A state handle that points to state stored in a file via Java Serialization.
- * 
- * @param <T> The type of state pointed to by the state handle.
- */
-public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> {
-
-	private static final long serialVersionUID = -657631394290213622L;
-	
-	/**
-	 * Creates a new FileSerializableStateHandle pointing to state at the given file path.
-	 * 
-	 * @param filePath The path to the file containing the checkpointed state.
-	 */
-	public FileSerializableStateHandle(Path filePath) {
-		super(filePath);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public T getState(ClassLoader classLoader) throws Exception {
-		FSDataInputStream inStream = getFileSystem().open(getFilePath());
-		ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
-		return (T) ois.readObject();
-	}
-}