You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:21 UTC

[30/47] flink git commit: [FLINK-2354] [runtime] Replace old StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
deleted file mode 100644
index 79512d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.InputStream;
-
-/**
- * A state handle that points to state in a file system, accessible as an input stream.
- */
-public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle {
-	
-	private static final long serialVersionUID = -6826990484549987311L;
-
-	/**
-	 * Creates a new FileStreamStateHandle pointing to state at the given file path.
-	 * 
-	 * @param filePath The path to the file containing the checkpointed state.
-	 */
-	public FileStreamStateHandle(Path filePath) {
-		super(filePath);
-	}
-
-	@Override
-	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
-		return getFileSystem().open(getFilePath());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
deleted file mode 100644
index 107a3be..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.state.AbstractHeapKvState;
-
-import java.io.DataOutputStream;
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into files.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBackend> {
-	
-	/** The file system state backend backing snapshots of this state */
-	private final FsStateBackend backend;
-	
-	/**
-	 * Creates a new and empty key/value state.
-	 * 
-	 * @param keySerializer The serializer for the key.
-	 * @param valueSerializer The serializer for the value.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param backend The file system state backend backing snapshots of this state
-	 */
-	public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-							V defaultValue, FsStateBackend backend) {
-		super(keySerializer, valueSerializer, defaultValue);
-		this.backend = backend;
-	}
-
-	/**
-	 * Creates a new key/value state with the given state contents.
-	 * This method is used to re-create key/value state with existing data, for example from
-	 * a snapshot.
-	 * 
-	 * @param keySerializer The serializer for the key.
-	 * @param valueSerializer The serializer for the value.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param state The map of key/value pairs to initialize the state with.
-	 * @param backend The file system state backend backing snapshots of this state
-	 */
-	public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-							V defaultValue, HashMap<K, V> state, FsStateBackend backend) {
-		super(keySerializer, valueSerializer, defaultValue, state);
-		this.backend = backend;
-	}
-
-	
-	@Override
-	public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
-		// first, create an output stream to write to
-		try (FsStateBackend.FsCheckpointStateOutputStream out = 
-					backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
-			// serialize the state to the output stream
-			OutputViewDataOutputStreamWrapper outView = 
-					new OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
-			outView.writeInt(size());
-			writeStateToOutputView(outView);
-			outView.flush();
-			
-			// create a handle to the state
-			return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
deleted file mode 100644
index c7117f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-
-import java.io.DataInputStream;
-import java.util.HashMap;
-
-/**
- * A snapshot of a heap key/value state stored in a file.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements KvStateSnapshot<K, V, FsStateBackend> {
-	
-	private static final long serialVersionUID = 1L;
-
-	/** Name of the key serializer class */
-	private final String keySerializerClassName;
-
-	/** Name of the value serializer class */
-	private final String valueSerializerClassName;
-
-	/**
-	 * Creates a new state snapshot with data in the file system.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param filePath The path where the snapshot data is stored.
-	 */
-	public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) {
-		super(filePath);
-		this.keySerializerClassName = keySerializer.getClass().getName();
-		this.valueSerializerClassName = valueSerializer.getClass().getName();
-	}
-
-	@Override
-	public FsHeapKvState<K, V> restoreState(
-			FsStateBackend stateBackend,
-			final TypeSerializer<K> keySerializer,
-			final TypeSerializer<V> valueSerializer,
-			V defaultValue,
-			ClassLoader classLoader) throws Exception {
-
-		// validity checks
-		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
-				!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
-			throw new IllegalArgumentException(
-					"Cannot restore the state from the snapshot with the given serializers. " +
-							"State (K/V) was serialized with (" + valueSerializerClassName +
-							"/" + keySerializerClassName + ")");
-		}
-		
-		// state restore
-		try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
-			InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
-			
-			final int numEntries = inView.readInt();
-			HashMap<K, V> stateMap = new HashMap<>(numEntries);
-			
-			for (int i = 0; i < numEntries; i++) {
-				K key = keySerializer.deserialize(inView);
-				V value = valueSerializer.deserialize(inView);
-				stateMap.put(key, value);
-			}
-			
-			return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend);
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to restore state from file system", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
deleted file mode 100644
index 3cbd227..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.StateBackend;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
-
-/**
- * The file state backend is a state backend that stores the state of streaming jobs in a file system.
- * 
- * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
- * files for each state, for example:
- * 
- * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
- */
-public class FsStateBackend extends StateBackend<FsStateBackend> {
-
-	private static final long serialVersionUID = -8191916350224044011L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
-	
-	
-	/** The path to the directory for the checkpoint data, including the file system
-	 * description via scheme and optional authority */
-	private final Path basePath;
-	
-	/** The directory (job specific) into this initialized instance of the backend stores its data */
-	private transient Path checkpointDirectory;
-	
-	/** Cached handle to the file system for file operations */
-	private transient FileSystem filesystem;
-
-
-	/**
-	 * Creates a new state backend that stores its checkpoint data in the file system and location
-	 * defined by the given URI.
-	 *
-	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
-	 * must be accessible via {@link FileSystem#get(URI)}.
-	 *
-	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
-	 * (host and port), or that the Hadoop configuration that describes that information must be in the
-	 * classpath.
-	 *
-	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
-	 *                          and the path to teh checkpoint data directory.
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
-	 */
-	public FsStateBackend(String checkpointDataUri) throws IOException {
-		this(new Path(checkpointDataUri));
-	}
-
-	/**
-	 * Creates a new state backend that stores its checkpoint data in the file system and location
-	 * defined by the given URI.
-	 *
-	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
-	 * must be accessible via {@link FileSystem#get(URI)}.
-	 *
-	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
-	 * (host and port), or that the Hadoop configuration that describes that information must be in the
-	 * classpath.
-	 *
-	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
-	 *                          and the path to teh checkpoint data directory.
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
-	 */
-	public FsStateBackend(Path checkpointDataUri) throws IOException {
-		this(checkpointDataUri.toUri());
-	}
-
-	/**
-	 * Creates a new state backend that stores its checkpoint data in the file system and location
-	 * defined by the given URI.
-	 * 
-	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
-	 * must be accessible via {@link FileSystem#get(URI)}.
-	 * 
-	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
-	 * (host and port), or that the Hadoop configuration that describes that information must be in the
-	 * classpath.
-	 * 
-	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
-	 *                          and the path to teh checkpoint data directory.
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
-	 */
-	public FsStateBackend(URI checkpointDataUri) throws IOException {
-		final String scheme = checkpointDataUri.getScheme();
-		final String path = checkpointDataUri.getPath();
-		
-		// some validity checks
-		if (scheme == null) {
-			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
-					"Please specify the file system scheme explicitly in the URI.");
-		}
-		if (path == null) {
-			throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
-					"Please specify a directory path for the checkpoint data.");
-		}
-		if (path.length() == 0 || path.equals("/")) {
-			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
-		}
-		
-		// we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
-		// (distributed) filesystem on all hosts and includes full host/port information, even if the
-		// original URI did not include that. We count on the filesystem loading from the configuration
-		// to fill in the missing data.
-		
-		// try to grab the file system for this path/URI
-		this.filesystem = FileSystem.get(checkpointDataUri);
-		if (this.filesystem == null) {
-			throw new IOException("Could not find a file system for the given scheme in the available configurations.");
-		}
-
-		URI fsURI = this.filesystem.getUri();
-		try {
-			URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
-			this.basePath = new Path(baseURI);
-		}
-		catch (URISyntaxException e) {
-			throw new IOException(
-					String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", 
-							checkpointDataUri, fsURI), e);
-		}
-	}
-
-	/**
-	 * Gets the base directory where all state-containing files are stored.
-	 * The job specific directory is created inside this directory.
-	 * 
-	 * @return The base directory.
-	 */
-	public Path getBasePath() {
-		return basePath;
-	}
-
-	/**
-	 * Gets the directory where this state backend stores its checkpoint data. Will be null if
-	 * the state backend has not been initialized.
-	 * 
-	 * @return The directory where this state backend stores its checkpoint data.
-	 */
-	public Path getCheckpointDirectory() {
-		return checkpointDirectory;
-	}
-
-	/**
-	 * Checks whether this state backend is initialized. Note that initialization does not carry
-	 * across serialization. After each serialization, the state backend needs to be initialized.
-	 * 
-	 * @return True, if the file state backend has been initialized, false otherwise.
-	 */
-	public boolean isInitialized() {
-		return filesystem != null && checkpointDirectory != null; 
-	}
-
-	/**
-	 * Gets the file system handle for the file system that stores the state for this backend.
-	 * 
-	 * @return This backend's file system handle.
-	 */
-	public FileSystem getFileSystem() {
-		if (filesystem != null) {
-			return filesystem;
-		}
-		else {
-			throw new IllegalStateException("State backend has not been initialized.");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  initialization and cleanup
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void initializeForJob(JobID jobId) throws Exception {
-		Path dir = new Path(basePath, jobId.toString());
-		
-		LOG.info("Initializing file state backend to URI " + dir);
-		
-		filesystem = basePath.getFileSystem();
-		filesystem.mkdirs(dir);
-
-		checkpointDirectory = dir;
-	}
-
-	@Override
-	public void disposeAllStateForCurrentJob() throws Exception {
-		FileSystem fs = this.filesystem;
-		Path dir = this.checkpointDirectory;
-		
-		if (fs != null && dir != null) {
-			this.filesystem = null;
-			this.checkpointDirectory = null;
-			fs.delete(dir, true);
-		}
-		else {
-			throw new IllegalStateException("state backend has not been initialized");
-		}
-	}
-
-	@Override
-	public void close() throws Exception {}
-
-	// ------------------------------------------------------------------------
-	//  state backend operations
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public <K, V> FsHeapKvState<K, V> createKvState(
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
-		return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
-	}
-
-	@Override
-	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception
-	{
-		checkFileSystemInitialized();
-
-		// make sure the directory for that specific checkpoint exists
-		final Path checkpointDir = createCheckpointDirPath(checkpointID);
-		filesystem.mkdirs(checkpointDir);
-
-		
-		Exception latestException = null;
-
-		for (int attempt = 0; attempt < 10; attempt++) {
-			Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
-			FSDataOutputStream outStream;
-			try {
-				outStream = filesystem.create(targetPath, false);
-			}
-			catch (Exception e) {
-				latestException = e;
-				continue;
-			}
-
-			ObjectOutputStream os = new ObjectOutputStream(outStream);
-			os.writeObject(state);
-			os.close();
-			return new FileSerializableStateHandle<S>(targetPath);
-		}
-		
-		throw new Exception("Could not open output stream for state backend", latestException);
-	}
-	
-	@Override
-	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
-		checkFileSystemInitialized();
-		
-		final Path checkpointDir = createCheckpointDirPath(checkpointID);
-		filesystem.mkdirs(checkpointDir);
-		
-		Exception latestException = null;
-		
-		for (int attempt = 0; attempt < 10; attempt++) {
-			Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
-			try {
-				FSDataOutputStream outStream = filesystem.create(targetPath, false);
-				return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
-			}
-			catch (Exception e) {
-				latestException = e;
-			}
-		}
-		throw new Exception("Could not open output stream for state backend", latestException);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private void checkFileSystemInitialized() throws IllegalStateException {
-		if (filesystem == null || checkpointDirectory == null) {
-			throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
-		}
-	}
-	
-	private Path createCheckpointDirPath(long checkpointID) {
-		return new Path(checkpointDirectory, "chk-" + checkpointID);
-	}
-	
-	@Override
-	public String toString() {
-		return checkpointDirectory == null ?
-			"File State Backend @ " + basePath : 
-			"File State Backend (initialized) @ " + checkpointDirectory;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Output stream for state checkpointing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon
-	 * closing.
-	 */
-	public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
-
-		private final FSDataOutputStream outStream;
-		
-		private final Path filePath;
-		
-		private final FileSystem fs;
-		
-		private boolean closed;
-
-		FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) {
-			this.outStream = outStream;
-			this.filePath = filePath;
-			this.fs = fs;
-		}
-
-
-		@Override
-		public void write(int b) throws IOException {
-			outStream.write(b);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			outStream.write(b, off, len);
-		}
-
-		@Override
-		public void flush() throws IOException {
-			outStream.flush();
-		}
-
-		/**
-		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
-		 * feature, for example). This method throws no exception if the deletion fails, but only
-		 * logs the error.
-		 */
-		@Override
-		public void close() {
-			synchronized (this) {
-				if (!closed) {
-					closed = true;
-					try {
-						outStream.close();
-						fs.delete(filePath, false);
-						
-						// attempt to delete the parent (will fail and be ignored if the parent has more files)
-						try {
-							fs.delete(filePath.getParent(), false);
-						} catch (IOException ignored) {}
-					}
-					catch (Exception e) {
-						LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e);
-					}
-				}
-			}
-		}
-
-		@Override
-		public FileStreamStateHandle closeAndGetHandle() throws IOException {
-			return new FileStreamStateHandle(closeAndGetPath());
-		}
-
-		/**
-		 * Closes the stream and returns the path to the file that contains the stream's data.
-		 * @return The path to the file that contains the stream's data.
-		 * @throws IOException Thrown if the stream cannot be successfully closed.
-		 */
-		public Path closeAndGetPath() throws IOException {
-			synchronized (this) {
-				if (!closed) {
-					closed = true;
-					outStream.close();
-					return filePath;
-				}
-				else {
-					throw new IOException("Stream has already been closed and discarded.");
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
deleted file mode 100644
index f0ad6bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.state.StateBackendFactory;
-
-/**
- * A factory that creates an {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
- * from a configuration.
- */
-public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
-	
-	/** The key under which the config stores the directory where checkpoints should be stored */
-	public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
-	
-	
-	@Override
-	public FsStateBackend createFromConfig(Configuration config) throws Exception {
-		String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-
-		if (checkpointDirURI == null) {
-			throw new IllegalConfigurationException(
-					"Cannot create the file system state backend: The configuration does not specify the " +
-							"checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
-		}
-		
-		try {
-			Path path = new Path(checkpointDirURI);
-			return new FsStateBackend(path);
-		}
-		catch (IllegalArgumentException e) {
-			throw new Exception("Cannot initialize File System State Backend with URI '"
-					+ checkpointDirURI + '.', e);
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
deleted file mode 100644
index 7952e58..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-/**
- * A state handle that contains stream state in a byte array.
- */
-public final class ByteStreamStateHandle implements StreamStateHandle {
-
-	private static final long serialVersionUID = -5280226231200217594L;
-	
-	/** the state data */
-	private final byte[] data;
-
-	/**
-	 * Creates a new ByteStreamStateHandle containing the given data.
-	 * 
-	 * @param data The state data.
-	 */
-	public ByteStreamStateHandle(byte[] data) {
-		this.data = data;
-	}
-
-	@Override
-	public InputStream getState(ClassLoader userCodeClassLoader) {
-		return new ByteArrayInputStream(data);
-	}
-
-	@Override
-	public void discardState() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
deleted file mode 100644
index e611887..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.state.AbstractHeapKvState;
-
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into a serialized memory copy.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateBackend> {
-	
-	public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
-		super(keySerializer, valueSerializer, defaultValue);
-	}
-
-	public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-							V defaultValue, HashMap<K, V> state) {
-		super(keySerializer, valueSerializer, defaultValue, state);
-	}
-	
-	@Override
-	public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
-		DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16));
-		writeStateToOutputView(ser);
-		byte[] bytes = ser.getCopyOfBuffer();
-		
-		return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), getValueSerializer(), bytes, size());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
deleted file mode 100644
index 7f50379..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-
-import java.util.HashMap;
-
-/**
- * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored in a heap byte
- * array, in serialized form.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, MemoryStateBackend> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	/** Name of the key serializer class */
-	private final String keySerializerClassName;
-
-	/** Name of the value serializer class */
-	private final String valueSerializerClassName;
-	
-	/** The serialized data of the state key/value pairs */
-	private final byte[] data;
-	
-	/** The number of key/value pairs */
-	private final int numEntries;
-
-	/**
-	 * Creates a new heap memory state snapshot.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param data The serialized data of the state key/value pairs
-	 * @param numEntries The number of key/value pairs
-	 */
-	public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer,
-						TypeSerializer<V> valueSerializer, byte[] data, int numEntries) {
-		this.keySerializerClassName = keySerializer.getClass().getName();
-		this.valueSerializerClassName = valueSerializer.getClass().getName();
-		this.data = data;
-		this.numEntries = numEntries;
-	}
-
-
-	@Override
-	public MemHeapKvState<K, V> restoreState(
-			MemoryStateBackend stateBackend,
-			final TypeSerializer<K> keySerializer,
-			final TypeSerializer<V> valueSerializer,
-			V defaultValue,
-			ClassLoader classLoader) throws Exception {
-
-		// validity checks
-		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
-			!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
-				throw new IllegalArgumentException(
-						"Cannot restore the state from the snapshot with the given serializers. " +
-						"State (K/V) was serialized with (" + valueSerializerClassName + 
-						"/" + keySerializerClassName + ")");
-		}
-		
-		// restore state
-		HashMap<K, V> stateMap = new HashMap<>(numEntries);
-		DataInputDeserializer in = new DataInputDeserializer(data, 0, data.length);
-		
-		for (int i = 0; i < numEntries; i++) {
-			K key = keySerializer.deserialize(in);
-			V value = valueSerializer.deserialize(in);
-			stateMap.put(key, value);
-		}
-		
-		return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap);
-	}
-
-	/**
-	 * Discarding the heap state is a no-op.
-	 */
-	@Override
-	public void discardState() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
deleted file mode 100644
index 05368bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A {@link StateBackend} that stores all its data and checkpoints in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
- * transferred 
- */
-public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
-
-	private static final long serialVersionUID = 4109305377809414635L;
-	
-	/** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
-	private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
-	
-	/** The maximal size that the snapshotted memory state may have */
-	private final int maxStateSize;
-
-	/**
-	 * Creates a new memory state backend that accepts states whose serialized forms are
-	 * up to the default state size (5 MB).
-	 */
-	public MemoryStateBackend() {
-		this(DEFAULT_MAX_STATE_SIZE);
-	}
-
-	/**
-	 * Creates a new memory state backend that accepts states whose serialized forms are
-	 * up to the given number of bytes.
-	 * 
-	 * @param maxStateSize The maximal size of the serialized state
-	 */
-	public MemoryStateBackend(int maxStateSize) {
-		this.maxStateSize = maxStateSize;
-	}
-
-	// ------------------------------------------------------------------------
-	//  initialization and cleanup
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void initializeForJob(JobID job) {
-		// nothing to do here
-	}
-
-	@Override
-	public void disposeAllStateForCurrentJob() {
-		// nothing to do here, GC will do it
-	}
-
-	@Override
-	public void close() throws Exception {}
-
-	// ------------------------------------------------------------------------
-	//  State backend operations
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public <K, V> MemHeapKvState<K, V> createKvState(
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
-		return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
-	}
-	
-	/**
-	 * Serialized the given state into bytes using Java serialization and creates a state handle that
-	 * can re-create that state.
-	 * 
-	 * @param state The state to checkpoint.
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @param <S> The type of the state.
-	 * 
-	 * @return A state handle that contains the given state serialized as bytes.
-	 * @throws Exception Thrown, if the serialization fails.
-	 */
-	@Override
-	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception
-	{
-		SerializedStateHandle<S> handle = new SerializedStateHandle<>(state);
-		checkSize(handle.getSizeOfSerializedState(), maxStateSize);
-		return new SerializedStateHandle<S>(state);
-	}
-
-	@Override
-	public CheckpointStateOutputStream createCheckpointStateOutputStream(
-			long checkpointID, long timestamp) throws Exception
-	{
-		return new MemoryCheckpointOutputStream(maxStateSize);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
-	}
-
-	static void checkSize(int size, int maxSize) throws IOException {
-		if (size > maxSize) {
-			throw new IOException(
-					"Size of the state is larger than the maximum permitted memory-backed state. Size="
-							+ size + " , maxSize=" + maxSize
-							+ " . Consider using a different state backend, like the File System State backend.");
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A CheckpointStateOutputStream that writes into a byte array.
-	 */
-	public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
-		
-		private final ByteArrayOutputStream os = new ByteArrayOutputStream();
-		
-		private final int maxSize;
-		
-		private boolean closed;
-
-		public MemoryCheckpointOutputStream(int maxSize) {
-			this.maxSize = maxSize;
-		}
-
-		@Override
-		public void write(int b) {
-			os.write(b);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) {
-			os.write(b, off, len);
-		}
-
-		// --------------------------------------------------------------------
-
-		@Override
-		public void close() {
-			closed = true;
-			os.reset();
-		}
-
-		@Override
-		public StreamStateHandle closeAndGetHandle() throws IOException {
-			return new ByteStreamStateHandle(closeAndGetBytes());
-		}
-
-		/**
-		 * Closes the stream and returns the byte array containing the stream's data.
-		 * @return The byte array containing the stream's data.
-		 * @throws IOException Thrown if the size of the data exceeds the maximal 
-		 */
-		public byte[] closeAndGetBytes() throws IOException {
-			if (!closed) {
-				checkSize(os.size(), maxSize);
-				byte[] bytes = os.toByteArray();
-				close();
-				return bytes;
-			}
-			else {
-				throw new IllegalStateException("stream has already been closed");
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Static default instance
-	// ------------------------------------------------------------------------
-	
-	/** The default instance of this state backend, using the default maximal state size */
-	private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend();
-
-	/**
-	 * Gets the default instance of this state backend, using the default maximal state size.
-	 * @return The default instance of this state backend.
-	 */
-	public static MemoryStateBackend defaultInstance() {
-		return DEFAULT_INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
deleted file mode 100644
index 163cadd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-
-/**
- * A state handle that represents its state in serialized form as bytes.
- *
- * @param <T> The type of state represented by this state handle.
- */
-public class SerializedStateHandle<T> extends SerializedValue<T> implements StateHandle<T> {
-	
-	private static final long serialVersionUID = 4145685722538475769L;
-
-	public SerializedStateHandle(T value) throws IOException {
-		super(value);
-	}
-	
-	@Override
-	public T getState(ClassLoader classLoader) throws Exception {
-		return deserializeValue(classLoader);
-	}
-
-	/**
-	 * Discarding heap-memory backed state is a no-op, so this method does nothing.
-	 */
-	@Override
-	public void discardState() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index cf8575e..9964760 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 72a8c25..8c58e29 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,17 +30,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StateBackendFactory;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -493,55 +492,52 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	
 	private StateBackend<?> createStateBackend() throws Exception {
 		StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
-		
+
 		if (configuredBackend != null) {
 			// backend has been configured on the environment
 			LOG.info("Using user-defined state backend: " + configuredBackend);
 			return configuredBackend;
-		}
-		else {
+		} else {
 			// see if we have a backend specified in the configuration
 			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
 			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
-			
+
 			if (backendName == null) {
 				LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
 				backendName = "jobmanager";
 			}
-			
+
 			backendName = backendName.toLowerCase();
 			switch (backendName) {
 				case "jobmanager":
 					LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
 					return MemoryStateBackend.defaultInstance();
-				
+
 				case "filesystem":
 					FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
 					LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
-							+ backend.getBasePath() + "\")");
+						+ backend.getBasePath() + "\")");
 					return backend;
-				
+
 				default:
 					try {
 						@SuppressWarnings("rawtypes")
 						Class<? extends StateBackendFactory> clazz =
-								Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
+							Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
 
 						return (StateBackend<?>) clazz.newInstance();
-					}
-					catch (ClassNotFoundException e) {
+					} catch (ClassNotFoundException e) {
 						throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
-					}
-					catch (ClassCastException e) {
+					} catch (ClassCastException e) {
 						throw new IllegalConfigurationException("The class configured under '" +
-								ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
-								backendName + ')');
-					}
-					catch (Throwable t) {
+							ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+							backendName + ')');
+					} catch (Throwable t) {
 						throw new IllegalConfigurationException("Cannot create configured state backend", t);
 					}
 			}
 		}
+	}
 
 	/**
 	 * Registers a timer.

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 334fd44..afeabd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.KvStateSnapshot;
 
 import java.io.Serializable;
 import java.util.ConcurrentModificationException;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
deleted file mode 100644
index 73100d1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class FileStateBackendTest {
-	
-	@Test
-	public void testSetupAndSerialization() {
-		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
-		try {
-			final String backendDir = localFileUri(tempDir);
-			FsStateBackend originalBackend = new FsStateBackend(backendDir);
-			
-			assertFalse(originalBackend.isInitialized());
-			assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
-			assertNull(originalBackend.getCheckpointDirectory());
-			
-			// serialize / copy the backend
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
-			assertFalse(backend.isInitialized());
-			assertEquals(new URI(backendDir), backend.getBasePath().toUri());
-			assertNull(backend.getCheckpointDirectory());
-			
-			// no file operations should be possible right now
-			try {
-				backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
-				fail("should fail with an exception");
-			} catch (IllegalStateException e) {
-				// supreme!
-			}
-			
-			backend.initializeForJob(new JobID());
-			assertNotNull(backend.getCheckpointDirectory());
-			
-			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-			assertTrue(checkpointDir.exists());
-			assertTrue(isDirectoryEmpty(checkpointDir));
-			
-			backend.disposeAllStateForCurrentJob();
-			assertNull(backend.getCheckpointDirectory());
-			
-			assertTrue(isDirectoryEmpty(tempDir));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			deleteDirectorySilently(tempDir);
-		}
-	}
-	
-	@Test
-	public void testSerializableState() {
-		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
-		try {
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
-
-			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
-			String state1 = "dummy state";
-			String state2 = "row row row your boat";
-			Integer state3 = 42;
-			
-			StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
-			StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
-			StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
-
-			assertFalse(isDirectoryEmpty(checkpointDir));
-			assertEquals(state1, handle1.getState(getClass().getClassLoader()));
-			handle1.discardState();
-			
-			assertFalse(isDirectoryEmpty(checkpointDir));
-			assertEquals(state2, handle2.getState(getClass().getClassLoader()));
-			handle2.discardState();
-			
-			assertFalse(isDirectoryEmpty(checkpointDir));
-			assertEquals(state3, handle3.getState(getClass().getClassLoader()));
-			handle3.discardState();
-			
-			assertTrue(isDirectoryEmpty(checkpointDir));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			deleteDirectorySilently(tempDir);
-		}
-	}
-
-	@Test
-	public void testStateOutputStream() {
-		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
-		try {
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
-
-			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
-			byte[] state1 = new byte[1274673];
-			byte[] state2 = new byte[1];
-			byte[] state3 = new byte[0];
-			byte[] state4 = new byte[177];
-			
-			Random rnd = new Random();
-			rnd.nextBytes(state1);
-			rnd.nextBytes(state2);
-			rnd.nextBytes(state3);
-			rnd.nextBytes(state4);
-
-			long checkpointId = 97231523452L;
-
-			FsStateBackend.FsCheckpointStateOutputStream stream1 = 
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			FsStateBackend.FsCheckpointStateOutputStream stream2 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			FsStateBackend.FsCheckpointStateOutputStream stream3 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			
-			stream1.write(state1);
-			stream2.write(state2);
-			stream3.write(state3);
-			
-			FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
-			FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
-			FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
-			
-			// use with try-with-resources
-			StreamStateHandle handle4;
-			try (StateBackend.CheckpointStateOutputStream stream4 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
-				stream4.write(state4);
-				handle4 = stream4.closeAndGetHandle();
-			}
-			
-			// close before accessing handle
-			StateBackend.CheckpointStateOutputStream stream5 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			stream5.write(state4);
-			stream5.close();
-			try {
-				stream5.closeAndGetHandle();
-				fail();
-			} catch (IOException e) {
-				// uh-huh
-			}
-			
-			validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
-			handle1.discardState();
-			assertFalse(isDirectoryEmpty(checkpointDir));
-			ensureLocalFileDeleted(handle1.getFilePath());
-			
-			validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
-			handle2.discardState();
-			assertFalse(isDirectoryEmpty(checkpointDir));
-			ensureLocalFileDeleted(handle2.getFilePath());
-			
-			validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
-			handle3.discardState();
-			assertFalse(isDirectoryEmpty(checkpointDir));
-			ensureLocalFileDeleted(handle3.getFilePath());
-			
-			validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
-			handle4.discardState();
-			assertTrue(isDirectoryEmpty(checkpointDir));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			deleteDirectorySilently(tempDir);
-		}
-	}
-
-	@Test
-	public void testKeyValueState() {
-		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
-		try {
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
-
-			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
-			KvState<Integer, String, FsStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
-			assertEquals(0, kv.size());
-
-			// some modifications to the state
-			kv.setCurrentKey(1);
-			assertNull(kv.value());
-			kv.update("1");
-			assertEquals(1, kv.size());
-			kv.setCurrentKey(2);
-			assertNull(kv.value());
-			kv.update("2");
-			assertEquals(2, kv.size());
-			kv.setCurrentKey(1);
-			assertEquals("1", kv.value());
-			assertEquals(2, kv.size());
-
-			// draw a snapshot
-			KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
-					kv.shapshot(682375462378L, System.currentTimeMillis());
-
-			// make some more modifications
-			kv.setCurrentKey(1);
-			kv.update("u1");
-			kv.setCurrentKey(2);
-			kv.update("u2");
-			kv.setCurrentKey(3);
-			kv.update("u3");
-
-			// draw another snapshot
-			KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
-					kv.shapshot(682375462379L, System.currentTimeMillis());
-
-			// validate the original state
-			assertEquals(3, kv.size());
-			kv.setCurrentKey(1);
-			assertEquals("u1", kv.value());
-			kv.setCurrentKey(2);
-			assertEquals("u2", kv.value());
-			kv.setCurrentKey(3);
-			assertEquals("u3", kv.value());
-
-			// restore the first snapshot and validate it
-			KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
-					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-			assertEquals(2, restored1.size());
-			restored1.setCurrentKey(1);
-			assertEquals("1", restored1.value());
-			restored1.setCurrentKey(2);
-			assertEquals("2", restored1.value());
-
-			// restore the first snapshot and validate it
-			KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
-					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-			assertEquals(3, restored2.size());
-			restored2.setCurrentKey(1);
-			assertEquals("u1", restored2.value());
-			restored2.setCurrentKey(2);
-			assertEquals("u2", restored2.value());
-			restored2.setCurrentKey(3);
-			assertEquals("u3", restored2.value());
-
-			snapshot1.discardState();
-			assertFalse(isDirectoryEmpty(checkpointDir));
-
-			snapshot2.discardState();
-			assertTrue(isDirectoryEmpty(checkpointDir));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			deleteDirectorySilently(tempDir);
-		}
-	}
-
-	@Test
-	public void testRestoreWithWrongSerializers() {
-		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
-		try {
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
-
-			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-			
-			KvState<Integer, String, FsStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
-			kv.setCurrentKey(1);
-			kv.update("1");
-			kv.setCurrentKey(2);
-			kv.update("2");
-
-			KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
-					kv.shapshot(682375462378L, System.currentTimeMillis());
-
-
-			@SuppressWarnings("unchecked")
-			TypeSerializer<Integer> fakeIntSerializer =
-					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
-
-			@SuppressWarnings("unchecked")
-			TypeSerializer<String> fakeStringSerializer =
-					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
-
-			try {
-				snapshot.restoreState(backend, fakeIntSerializer,
-						StringSerializer.INSTANCE, null, getClass().getClassLoader());
-				fail("should recognize wrong serializers");
-			} catch (IllegalArgumentException e) {
-				// expected
-			} catch (Exception e) {
-				fail("wrong exception");
-			}
-
-			try {
-				snapshot.restoreState(backend, IntSerializer.INSTANCE,
-						fakeStringSerializer, null, getClass().getClassLoader());
-				fail("should recognize wrong serializers");
-			} catch (IllegalArgumentException e) {
-				// expected
-			} catch (Exception e) {
-				fail("wrong exception");
-			}
-
-			try {
-				snapshot.restoreState(backend, fakeIntSerializer,
-						fakeStringSerializer, null, getClass().getClassLoader());
-				fail("should recognize wrong serializers");
-			} catch (IllegalArgumentException e) {
-				// expected
-			} catch (Exception e) {
-				fail("wrong exception");
-			}
-			
-			snapshot.discardState();
-
-			assertTrue(isDirectoryEmpty(checkpointDir));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			deleteDirectorySilently(tempDir);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	private static void ensureLocalFileDeleted(Path path) {
-		URI uri = path.toUri();
-		if ("file".equals(uri.getScheme())) {
-			File file = new File(uri.getPath());
-			assertFalse("file not properly deleted", file.exists());
-		}
-		else {
-			throw new IllegalArgumentException("not a local path");
-		}
-	}
-	
-	private static void deleteDirectorySilently(File dir) {
-		try {
-			FileUtils.deleteDirectory(dir);
-		}
-		catch (IOException ignored) {}
-	}
-	
-	private static boolean isDirectoryEmpty(File directory) {
-		String[] nested = directory.list();
-		return  nested == null || nested.length == 0;
-	}
-	
-	private static String localFileUri(File path) {
-		return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
-	}
-	
-	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
-		byte[] holder = new byte[data.length];
-		assertEquals("not enough data", holder.length, is.read(holder));
-		assertEquals("too much data", -1, is.read());
-		assertArrayEquals("wrong data", data, holder);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
deleted file mode 100644
index 3410d09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend}.
- */
-public class MemoryStateBackendTest {
-	
-	@Test
-	public void testSerializableState() {
-		try {
-			MemoryStateBackend backend = new MemoryStateBackend();
-
-			HashMap<String, Integer> state = new HashMap<>();
-			state.put("hey there", 2);
-			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-			
-			StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
-			assertNotNull(handle);
-			
-			HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
-			assertEquals(state, restored);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testOversizedState() {
-		try {
-			MemoryStateBackend backend = new MemoryStateBackend(10);
-
-			HashMap<String, Integer> state = new HashMap<>();
-			state.put("hey there", 2);
-			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
-			try {
-				backend.checkpointStateSerializable(state, 12, 459);
-				fail("this should cause an exception");
-			}
-			catch (IOException e) {
-				// now darling, isn't that exactly what we wanted?
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testStateStream() {
-		try {
-			MemoryStateBackend backend = new MemoryStateBackend();
-
-			HashMap<String, Integer> state = new HashMap<>();
-			state.put("hey there", 2);
-			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
-			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
-			ObjectOutputStream oos = new ObjectOutputStream(os);
-			oos.writeObject(state);
-			oos.flush();
-			StreamStateHandle handle = os.closeAndGetHandle();
-			
-			assertNotNull(handle);
-
-			ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
-			assertEquals(state, ois.readObject());
-			assertTrue(ois.available() <= 0);
-			ois.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testOversizedStateStream() {
-		try {
-			MemoryStateBackend backend = new MemoryStateBackend(10);
-
-			HashMap<String, Integer> state = new HashMap<>();
-			state.put("hey there", 2);
-			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
-			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
-			ObjectOutputStream oos = new ObjectOutputStream(os);
-			
-			try {
-				oos.writeObject(state);
-				oos.flush();
-				os.closeAndGetHandle();
-				fail("this should cause an exception");
-			}
-			catch (IOException e) {
-				// oh boy! what an exception!
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testKeyValueState() {
-		try {
-			MemoryStateBackend backend = new MemoryStateBackend();
-			
-			KvState<Integer, String, MemoryStateBackend> kv = 
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-			
-			assertEquals(0, kv.size());
-			
-			// some modifications to the state
-			kv.setCurrentKey(1);
-			assertNull(kv.value());
-			kv.update("1");
-			assertEquals(1, kv.size());
-			kv.setCurrentKey(2);
-			assertNull(kv.value());
-			kv.update("2");
-			assertEquals(2, kv.size());
-			kv.setCurrentKey(1);
-			assertEquals("1", kv.value());
-			assertEquals(2, kv.size());
-			
-			// draw a snapshot
-			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 = 
-					kv.shapshot(682375462378L, System.currentTimeMillis());
-			
-			// make some more modifications
-			kv.setCurrentKey(1);
-			kv.update("u1");
-			kv.setCurrentKey(2);
-			kv.update("u2");
-			kv.setCurrentKey(3);
-			kv.update("u3");
-
-			// draw another snapshot
-			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
-					kv.shapshot(682375462379L, System.currentTimeMillis());
-			
-			// validate the original state
-			assertEquals(3, kv.size());
-			kv.setCurrentKey(1);
-			assertEquals("u1", kv.value());
-			kv.setCurrentKey(2);
-			assertEquals("u2", kv.value());
-			kv.setCurrentKey(3);
-			assertEquals("u3", kv.value());
-			
-			// restore the first snapshot and validate it
-			KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend, 
-							IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-			assertEquals(2, restored1.size());
-			restored1.setCurrentKey(1);
-			assertEquals("1", restored1.value());
-			restored1.setCurrentKey(2);
-			assertEquals("2", restored1.value());
-
-			// restore the first snapshot and validate it
-			KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
-					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-			assertEquals(3, restored2.size());
-			restored2.setCurrentKey(1);
-			assertEquals("u1", restored2.value());
-			restored2.setCurrentKey(2);
-			assertEquals("u2", restored2.value());
-			restored2.setCurrentKey(3);
-			assertEquals("u3", restored2.value());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testRestoreWithWrongSerializers() {
-		try {
-			MemoryStateBackend backend = new MemoryStateBackend();
-			KvState<Integer, String, MemoryStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-			
-			kv.setCurrentKey(1);
-			kv.update("1");
-			kv.setCurrentKey(2);
-			kv.update("2");
-			
-			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
-					kv.shapshot(682375462378L, System.currentTimeMillis());
-
-
-			@SuppressWarnings("unchecked")
-			TypeSerializer<Integer> fakeIntSerializer = 
-					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
-
-			@SuppressWarnings("unchecked")
-			TypeSerializer<String> fakeStringSerializer = 
-					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
-
-			try {
-				snapshot.restoreState(backend, fakeIntSerializer,
-						StringSerializer.INSTANCE, null, getClass().getClassLoader());
-				fail("should recognize wrong serializers");
-			} catch (IllegalArgumentException e) {
-				// expected
-			} catch (Exception e) {
-				fail("wrong exception");
-			}
-
-			try {
-				snapshot.restoreState(backend, IntSerializer.INSTANCE,
-						fakeStringSerializer, null, getClass().getClassLoader());
-				fail("should recognize wrong serializers");
-			} catch (IllegalArgumentException e) {
-				// expected
-			} catch (Exception e) {
-				fail("wrong exception");
-			}
-
-			try {
-				snapshot.restoreState(backend, fakeIntSerializer,
-						fakeStringSerializer, null, getClass().getClassLoader());
-				fail("should recognize wrong serializers");
-			} catch (IllegalArgumentException e) {
-				// expected
-			} catch (Exception e) {
-				fail("wrong exception");
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index dd76a67..ad3c838 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index ab8e551..4bd260f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 81d3a69..0c708c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index b83feca..01f95bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;