You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/11/24 10:45:07 UTC

[5/8] flink git commit: [FLINK-2916] [streaming] Expose operator and task information to StateBackend

[FLINK-2916] [streaming] Expose operator and task information to StateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad6f8265
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad6f8265
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad6f8265

Branch: refs/heads/master
Commit: ad6f826584be7527c58e2126e2828f82afc97875
Parents: 8cabe67
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Oct 26 09:58:49 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Nov 24 09:28:39 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/StateBackend.java       |  62 +++----
 .../state/filesystem/FsStateBackend.java        |  89 +++++-----
 .../state/memory/MemoryStateBackend.java        |  40 ++---
 .../operators/testutils/DummyEnvironment.java   | 167 +++++++++++++++++++
 .../runtime/state/FileStateBackendTest.java     | 109 ++++++------
 .../runtime/state/MemoryStateBackendTest.java   |  46 ++---
 .../flink/hdfstests/FileStateBackendTest.java   |  11 +-
 .../api/operators/AbstractStreamOperator.java   |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |   2 +-
 .../PartitionedStateCheckpointingITCase.java    |  21 +--
 10 files changed, 362 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index f8b1cfd..6f72bce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -31,32 +31,32 @@ import java.io.Serializable;
 
 /**
  * A state backend defines how state is stored and snapshotted during checkpoints.
- * 
+ *
  * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
  *                  type of backend when creating state backed by this backend.
  */
 public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
-	
+
 	private static final long serialVersionUID = 4620413814639220247L;
-	
+
 	// ------------------------------------------------------------------------
 	//  initialization and cleanup
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * This method is called by the task upon deployment to initialize the state backend for
 	 * data for a specific job.
-	 * 
-	 * @param job The ID of the job for which the state backend instance checkpoints data.
+	 *
+	 * @param The {@link Environment} of the task that instantiated the state backend					
 	 * @throws Exception Overwritten versions of this method may throw exceptions, in which
 	 *                   case the job that uses the state backend is considered failed during
 	 *                   deployment.
 	 */
-	public abstract void initializeForJob(JobID job) throws Exception;
+	public abstract void initializeForJob(Environment env) throws Exception;
 
 	/**
 	 * Disposes all state associated with the current job.
-	 * 
+	 *
 	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
 	 */
 	public abstract void disposeAllStateForCurrentJob() throws Exception;
@@ -64,33 +64,35 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 	/**
 	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
 	 * checkpoint data.
-	 * 
+	 *
 	 * @throws Exception Exceptions can be forwarded and will be logged by the system
 	 */
 	public abstract void close() throws Exception;
-	
+
 	// ------------------------------------------------------------------------
 	//  key/value state
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a key/value state backed by this state backend.
-	 * 
+	 *
+	 * @param operatorId Unique id for the operator creating the state
+	 * @param stateName Name of the created state
 	 * @param keySerializer The serializer for the key.
 	 * @param valueSerializer The serializer for the value.
 	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
 	 * @param <K> The type of the key.
 	 * @param <V> The type of the value.
-	 * 
+	 *
 	 * @return A new key/value state backed by this backend.
-	 * 
+	 *
 	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
 	 */
-	public abstract <K, V> KvState<K, V, Backend> createKvState(
+	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
 			V defaultValue) throws Exception;
-	
-	
+
+
 	// ------------------------------------------------------------------------
 	//  storing state for a checkpoint
 	// ------------------------------------------------------------------------
@@ -98,16 +100,16 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 	/**
 	 * Creates an output stream that writes into the state of the given checkpoint. When the stream
 	 * is closes, it returns a state handle that can retrieve the state back.
-	 * 
+	 *
 	 * @param checkpointID The ID of the checkpoint.
 	 * @param timestamp The timestamp of the checkpoint.
 	 * @return An output stream that writes state for the given checkpoint.
-	 * 
+	 *
 	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
 	 */
 	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
 			long checkpointID, long timestamp) throws Exception;
-	
+
 	/**
 	 * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
 	 * When the stream is closes, it returns a state handle that can retrieve the state back.
@@ -125,20 +127,20 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 
 	/**
 	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
-	 * 
+	 *
 	 * @param state The state to be checkpointed.
 	 * @param checkpointID The ID of the checkpoint.
 	 * @param timestamp The timestamp of the checkpoint.
 	 * @param <S> The type of the state.
-	 * 
+	 *
 	 * @return A state handle that can retrieve the checkpoined state.
-	 * 
+	 *
 	 * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
 	 */
 	public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
 			S state, long checkpointID, long timestamp) throws Exception;
-	
-	
+
+
 	// ------------------------------------------------------------------------
 	//  Checkpoint state output stream
 	// ------------------------------------------------------------------------
@@ -151,7 +153,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 		/**
 		 * Closes the stream and gets a state handle that can create an input stream
 		 * producing the data written to this stream.
-		 * 
+		 *
 		 * @return A state handle that can create an input stream producing the data written to this stream.
 		 * @throws IOException Thrown, if the stream cannot be closed.
 		 */
@@ -162,9 +164,9 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 	 * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
 	 */
 	public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-		
+
 		private final CheckpointStateOutputStream out;
-		
+
 		public CheckpointStateOutputView(CheckpointStateOutputStream out) {
 			super(out);
 			this.out = out;
@@ -193,7 +195,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
 
 		private static final long serialVersionUID = 2891559813513532079L;
-		
+
 		private final StreamStateHandle stream;
 
 		private DataInputViewHandle(StreamStateHandle stream) {
@@ -202,7 +204,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 
 		@Override
 		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
-			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); 
+			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index d7b392c..6a94a80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.StateBackend;
-
+import org.apache.flink.runtime.state.StateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,27 +37,27 @@ import java.util.UUID;
 
 /**
  * The file state backend is a state backend that stores the state of streaming jobs in a file system.
- * 
+ *
  * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
  * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
  * files for each state, for example:
- * 
+ *
  * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
  */
 public class FsStateBackend extends StateBackend<FsStateBackend> {
 
 	private static final long serialVersionUID = -8191916350224044011L;
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
-	
-	
+
+
 	/** The path to the directory for the checkpoint data, including the file system
 	 * description via scheme and optional authority */
 	private final Path basePath;
-	
+
 	/** The directory (job specific) into this initialized instance of the backend stores its data */
 	private transient Path checkpointDirectory;
-	
+
 	/** Cached handle to the file system for file operations */
 	private transient FileSystem filesystem;
 
@@ -104,14 +103,14 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
 	 * defined by the given URI.
-	 * 
+	 *
 	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
 	 * must be accessible via {@link FileSystem#get(URI)}.
-	 * 
+	 *
 	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
 	 * (host and port), or that the Hadoop configuration that describes that information must be in the
 	 * classpath.
-	 * 
+	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to teh checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
@@ -119,7 +118,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	public FsStateBackend(URI checkpointDataUri) throws IOException {
 		final String scheme = checkpointDataUri.getScheme();
 		final String path = checkpointDataUri.getPath();
-		
+
 		// some validity checks
 		if (scheme == null) {
 			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
@@ -132,12 +131,12 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 		if (path.length() == 0 || path.equals("/")) {
 			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
 		}
-		
+
 		// we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
 		// (distributed) filesystem on all hosts and includes full host/port information, even if the
 		// original URI did not include that. We count on the filesystem loading from the configuration
 		// to fill in the missing data.
-		
+
 		// try to grab the file system for this path/URI
 		this.filesystem = FileSystem.get(checkpointDataUri);
 		if (this.filesystem == null) {
@@ -151,7 +150,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 		}
 		catch (URISyntaxException e) {
 			throw new IOException(
-					String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", 
+					String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s",
 							checkpointDataUri, fsURI), e);
 		}
 	}
@@ -159,7 +158,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	/**
 	 * Gets the base directory where all state-containing files are stored.
 	 * The job specific directory is created inside this directory.
-	 * 
+	 *
 	 * @return The base directory.
 	 */
 	public Path getBasePath() {
@@ -169,7 +168,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	/**
 	 * Gets the directory where this state backend stores its checkpoint data. Will be null if
 	 * the state backend has not been initialized.
-	 * 
+	 *
 	 * @return The directory where this state backend stores its checkpoint data.
 	 */
 	public Path getCheckpointDirectory() {
@@ -179,16 +178,16 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	/**
 	 * Checks whether this state backend is initialized. Note that initialization does not carry
 	 * across serialization. After each serialization, the state backend needs to be initialized.
-	 * 
+	 *
 	 * @return True, if the file state backend has been initialized, false otherwise.
 	 */
 	public boolean isInitialized() {
-		return filesystem != null && checkpointDirectory != null; 
+		return filesystem != null && checkpointDirectory != null;
 	}
 
 	/**
 	 * Gets the file system handle for the file system that stores the state for this backend.
-	 * 
+	 *
 	 * @return This backend's file system handle.
 	 */
 	public FileSystem getFileSystem() {
@@ -203,13 +202,13 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	// ------------------------------------------------------------------------
 	//  initialization and cleanup
 	// ------------------------------------------------------------------------
-	
+
 	@Override
-	public void initializeForJob(JobID jobId) throws Exception {
-		Path dir = new Path(basePath, jobId.toString());
-		
+	public void initializeForJob(Environment env) throws Exception {
+		Path dir = new Path(basePath, env.getJobID().toString());
+
 		LOG.info("Initializing file state backend to URI " + dir);
-		
+
 		filesystem = basePath.getFileSystem();
 		filesystem.mkdirs(dir);
 
@@ -220,7 +219,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	public void disposeAllStateForCurrentJob() throws Exception {
 		FileSystem fs = this.filesystem;
 		Path dir = this.checkpointDirectory;
-		
+
 		if (fs != null && dir != null) {
 			this.filesystem = null;
 			this.checkpointDirectory = null;
@@ -237,9 +236,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	// ------------------------------------------------------------------------
 	//  state backend operations
 	// ------------------------------------------------------------------------
-	
+
 	@Override
-	public <K, V> FsHeapKvState<K, V> createKvState(
+	public <K, V> FsHeapKvState<K, V> createKvState(int operatorId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
 		return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
 	}
@@ -254,7 +253,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 		final Path checkpointDir = createCheckpointDirPath(checkpointID);
 		filesystem.mkdirs(checkpointDir);
 
-		
+
 		Exception latestException = null;
 
 		for (int attempt = 0; attempt < 10; attempt++) {
@@ -273,19 +272,19 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 			}
 			return new FileSerializableStateHandle<S>(targetPath);
 		}
-		
+
 		throw new Exception("Could not open output stream for state backend", latestException);
 	}
-	
+
 	@Override
 	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
 		checkFileSystemInitialized();
-		
+
 		final Path checkpointDir = createCheckpointDirPath(checkpointID);
 		filesystem.mkdirs(checkpointDir);
-		
+
 		Exception latestException = null;
-		
+
 		for (int attempt = 0; attempt < 10; attempt++) {
 			Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
 			try {
@@ -298,7 +297,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 		}
 		throw new Exception("Could not open output stream for state backend", latestException);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------
@@ -308,18 +307,18 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 			throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
 		}
 	}
-	
+
 	private Path createCheckpointDirPath(long checkpointID) {
 		return new Path(checkpointDirectory, "chk-" + checkpointID);
 	}
-	
+
 	@Override
 	public String toString() {
 		return checkpointDirectory == null ?
-			"File State Backend @ " + basePath : 
+			"File State Backend @ " + basePath :
 			"File State Backend (initialized) @ " + checkpointDirectory;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Output stream for state checkpointing
 	// ------------------------------------------------------------------------
@@ -331,11 +330,11 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
 
 		private final FSDataOutputStream outStream;
-		
+
 		private final Path filePath;
-		
+
 		private final FileSystem fs;
-		
+
 		private boolean closed;
 
 		FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) {
@@ -373,7 +372,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 					try {
 						outStream.close();
 						fs.delete(filePath, false);
-						
+
 						// attempt to delete the parent (will fail and be ignored if the parent has more files)
 						try {
 							fs.delete(filePath.getParent(), false);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 8d297d4..f3e7552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.state.memory;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.ByteArrayOutputStream;
@@ -31,15 +31,15 @@ import java.io.Serializable;
 /**
  * A {@link StateBackend} that stores all its data and checkpoints in memory and has no
  * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
- * transferred 
+ * transferred
  */
 public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 
 	private static final long serialVersionUID = 4109305377809414635L;
-	
+
 	/** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
 	private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
-	
+
 	/** The maximal size that the snapshotted memory state may have */
 	private final int maxStateSize;
 
@@ -54,7 +54,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 	/**
 	 * Creates a new memory state backend that accepts states whose serialized forms are
 	 * up to the given number of bytes.
-	 * 
+	 *
 	 * @param maxStateSize The maximal size of the serialized state
 	 */
 	public MemoryStateBackend(int maxStateSize) {
@@ -66,7 +66,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void initializeForJob(JobID job) {
+	public void initializeForJob(Environment env) {
 		// nothing to do here
 	}
 
@@ -81,22 +81,22 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 	// ------------------------------------------------------------------------
 	//  State backend operations
 	// ------------------------------------------------------------------------
-	
+
 	@Override
-	public <K, V> MemHeapKvState<K, V> createKvState(
+	public <K, V> MemHeapKvState<K, V> createKvState(int operatorId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
 		return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
 	}
-	
+
 	/**
 	 * Serialized the given state into bytes using Java serialization and creates a state handle that
 	 * can re-create that state.
-	 * 
+	 *
 	 * @param state The state to checkpoint.
 	 * @param checkpointID The ID of the checkpoint.
 	 * @param timestamp The timestamp of the checkpoint.
 	 * @param <S> The type of the state.
-	 * 
+	 *
 	 * @return A state handle that contains the given state serialized as bytes.
 	 * @throws Exception Thrown, if the serialization fails.
 	 */
@@ -119,7 +119,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
@@ -133,18 +133,18 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 							+ " . Consider using a different state backend, like the File System State backend.");
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * A CheckpointStateOutputStream that writes into a byte array.
 	 */
 	public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
-		
+
 		private final ByteArrayOutputStream os = new ByteArrayOutputStream();
-		
+
 		private final int maxSize;
-		
+
 		private boolean closed;
 
 		public MemoryCheckpointOutputStream(int maxSize) {
@@ -177,7 +177,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 		/**
 		 * Closes the stream and returns the byte array containing the stream's data.
 		 * @return The byte array containing the stream's data.
-		 * @throws IOException Thrown if the size of the data exceeds the maximal 
+		 * @throws IOException Thrown if the size of the data exceeds the maximal
 		 */
 		public byte[] closeAndGetBytes() throws IOException {
 			if (!closed) {
@@ -191,11 +191,11 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Static default instance
 	// ------------------------------------------------------------------------
-	
+
 	/** The default instance of this state backend, using the default maximal state size */
 	private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
new file mode 100644
index 0000000..71bec4a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
+public class DummyEnvironment implements Environment {
+
+	private final String taskName;
+	private final int numSubTasks;
+	private final int subTaskIndex;
+	private final JobID jobId = new JobID();
+	private final JobVertexID jobVertexId = new JobVertexID();
+
+	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
+		this.taskName = taskName;
+		this.numSubTasks = numSubTasks;
+		this.subTaskIndex = subTaskIndex;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobId;
+	}
+
+	@Override
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	@Override
+	public ExecutionAttemptID getExecutionId() {
+		return null;
+	}
+
+	@Override
+	public Configuration getTaskConfiguration() {
+		return null;
+	}
+
+	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return null;
+	}
+
+	@Override
+	public Configuration getJobConfiguration() {
+		return null;
+	}
+
+	@Override
+	public int getNumberOfSubtasks() {
+		return numSubTasks;
+	}
+
+	@Override
+	public int getIndexInSubtaskGroup() {
+		return subTaskIndex;
+	}
+
+	@Override
+	public InputSplitProvider getInputSplitProvider() {
+		return null;
+	}
+
+	@Override
+	public IOManager getIOManager() {
+		return null;
+	}
+
+	@Override
+	public MemoryManager getMemoryManager() {
+		return null;
+	}
+
+	@Override
+	public String getTaskName() {
+		return taskName;
+	}
+
+	@Override
+	public String getTaskNameWithSubtasks() {
+		return taskName;
+	}
+
+	@Override
+	public ClassLoader getUserClassLoader() {
+		return null;
+	}
+
+	@Override
+	public Map<String, Future<Path>> getDistributedCacheEntries() {
+		return null;
+	}
+
+	@Override
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return null;
+	}
+
+	@Override
+	public AccumulatorRegistry getAccumulatorRegistry() {
+		return null;
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(long checkpointId) {
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+	}
+
+	@Override
+	public ResultPartitionWriter getWriter(int index) {
+		return null;
+	}
+
+	@Override
+	public ResultPartitionWriter[] getAllWriters() {
+		return null;
+	}
+
+	@Override
+	public InputGate getInputGate(int index) {
+		return null;
+	}
+
+	@Override
+	public InputGate[] getAllInputGates() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index a6cfae3..fc5d8c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -18,8 +18,22 @@
 
 package org.apache.flink.runtime.state;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,41 +43,34 @@ import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
 
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
 public class FileStateBackendTest {
-	
+
 	@Test
 	public void testSetupAndSerialization() {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
 			final String backendDir = localFileUri(tempDir);
 			FsStateBackend originalBackend = new FsStateBackend(backendDir);
-			
+
 			assertFalse(originalBackend.isInitialized());
 			assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
 			assertNull(originalBackend.getCheckpointDirectory());
-			
+
 			// serialize / copy the backend
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
 			assertFalse(backend.isInitialized());
 			assertEquals(new URI(backendDir), backend.getBasePath().toUri());
 			assertNull(backend.getCheckpointDirectory());
-			
+
 			// no file operations should be possible right now
 			try {
 				backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
@@ -71,17 +78,17 @@ public class FileStateBackendTest {
 			} catch (IllegalStateException e) {
 				// supreme!
 			}
-			
-			backend.initializeForJob(new JobID());
+
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 			assertNotNull(backend.getCheckpointDirectory());
-			
+
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
 			assertTrue(checkpointDir.exists());
 			assertTrue(isDirectoryEmpty(checkpointDir));
-			
+
 			backend.disposeAllStateForCurrentJob();
 			assertNull(backend.getCheckpointDirectory());
-			
+
 			assertTrue(isDirectoryEmpty(tempDir));
 		}
 		catch (Exception e) {
@@ -92,20 +99,20 @@ public class FileStateBackendTest {
 			deleteDirectorySilently(tempDir);
 		}
 	}
-	
+
 	@Test
 	public void testSerializableState() {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
 
 			String state1 = "dummy state";
 			String state2 = "row row row your boat";
 			Integer state3 = 42;
-			
+
 			StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
 			StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
 			StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
@@ -113,15 +120,15 @@ public class FileStateBackendTest {
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			assertEquals(state1, handle1.getState(getClass().getClassLoader()));
 			handle1.discardState();
-			
+
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			assertEquals(state2, handle2.getState(getClass().getClassLoader()));
 			handle2.discardState();
-			
+
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			assertEquals(state3, handle3.getState(getClass().getClassLoader()));
 			handle3.discardState();
-			
+
 			assertTrue(isDirectoryEmpty(checkpointDir));
 		}
 		catch (Exception e) {
@@ -138,7 +145,7 @@ public class FileStateBackendTest {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
 
@@ -146,7 +153,7 @@ public class FileStateBackendTest {
 			byte[] state2 = new byte[1];
 			byte[] state3 = new byte[0];
 			byte[] state4 = new byte[177];
-			
+
 			Random rnd = new Random();
 			rnd.nextBytes(state1);
 			rnd.nextBytes(state2);
@@ -155,21 +162,21 @@ public class FileStateBackendTest {
 
 			long checkpointId = 97231523452L;
 
-			FsStateBackend.FsCheckpointStateOutputStream stream1 = 
+			FsStateBackend.FsCheckpointStateOutputStream stream1 =
 					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
 			FsStateBackend.FsCheckpointStateOutputStream stream2 =
 					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
 			FsStateBackend.FsCheckpointStateOutputStream stream3 =
 					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			
+
 			stream1.write(state1);
 			stream2.write(state2);
 			stream3.write(state3);
-			
+
 			FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
 			FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
 			FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
-			
+
 			// use with try-with-resources
 			StreamStateHandle handle4;
 			try (StateBackend.CheckpointStateOutputStream stream4 =
@@ -177,7 +184,7 @@ public class FileStateBackendTest {
 				stream4.write(state4);
 				handle4 = stream4.closeAndGetHandle();
 			}
-			
+
 			// close before accessing handle
 			StateBackend.CheckpointStateOutputStream stream5 =
 					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
@@ -189,22 +196,22 @@ public class FileStateBackendTest {
 			} catch (IOException e) {
 				// uh-huh
 			}
-			
+
 			validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
 			handle1.discardState();
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			ensureLocalFileDeleted(handle1.getFilePath());
-			
+
 			validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
 			handle2.discardState();
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			ensureLocalFileDeleted(handle2.getFilePath());
-			
+
 			validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
 			handle3.discardState();
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			ensureLocalFileDeleted(handle3.getFilePath());
-			
+
 			validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
 			handle4.discardState();
 			assertTrue(isDirectoryEmpty(checkpointDir));
@@ -223,12 +230,12 @@ public class FileStateBackendTest {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
 
 			KvState<Integer, String, FsStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+					backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
 
 			assertEquals(0, kv.size());
 
@@ -312,12 +319,12 @@ public class FileStateBackendTest {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-			
+
 			KvState<Integer, String, FsStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+					backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
 
 			kv.setCurrentKey(1);
 			kv.update("1");
@@ -365,7 +372,7 @@ public class FileStateBackendTest {
 			} catch (Exception e) {
 				fail("wrong exception");
 			}
-			
+
 			snapshot.discardState();
 
 			assertTrue(isDirectoryEmpty(checkpointDir));
@@ -384,10 +391,10 @@ public class FileStateBackendTest {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new JobID());
-			
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
+
 			KvState<Integer, IntValue, FsStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+					backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
 
 			kv.setCurrentKey(1);
 			IntValue default1 = kv.value();
@@ -408,11 +415,11 @@ public class FileStateBackendTest {
 			deleteDirectorySilently(tempDir);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	private static void ensureLocalFileDeleted(Path path) {
 		URI uri = path.toUri();
 		if ("file".equals(uri.getScheme())) {
@@ -423,23 +430,23 @@ public class FileStateBackendTest {
 			throw new IllegalArgumentException("not a local path");
 		}
 	}
-	
+
 	private static void deleteDirectorySilently(File dir) {
 		try {
 			FileUtils.deleteDirectory(dir);
 		}
 		catch (IOException ignored) {}
 	}
-	
+
 	private static boolean isDirectoryEmpty(File directory) {
 		String[] nested = directory.list();
 		return  nested == null || nested.length == 0;
 	}
-	
+
 	private static String localFileUri(File path) {
 		return path.toURI().toString();
 	}
-	
+
 	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
 		byte[] holder = new byte[data.length];
 		assertEquals("not enough data", holder.length, is.read(holder));

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index f6d1bb5..ae027e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.*;
  * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
 public class MemoryStateBackendTest {
-	
+
 	@Test
 	public void testSerializableState() {
 		try {
@@ -49,10 +49,10 @@ public class MemoryStateBackendTest {
 			HashMap<String, Integer> state = new HashMap<>();
 			state.put("hey there", 2);
 			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-			
+
 			StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
 			assertNotNull(handle);
-			
+
 			HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
 			assertEquals(state, restored);
 		}
@@ -99,7 +99,7 @@ public class MemoryStateBackendTest {
 			oos.writeObject(state);
 			oos.flush();
 			StreamStateHandle handle = os.closeAndGetHandle();
-			
+
 			assertNotNull(handle);
 
 			ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
@@ -124,7 +124,7 @@ public class MemoryStateBackendTest {
 
 			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
 			ObjectOutputStream oos = new ObjectOutputStream(os);
-			
+
 			try {
 				oos.writeObject(state);
 				oos.flush();
@@ -140,17 +140,17 @@ public class MemoryStateBackendTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testKeyValueState() {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend();
-			
-			KvState<Integer, String, MemoryStateBackend> kv = 
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-			
+
+			KvState<Integer, String, MemoryStateBackend> kv =
+					backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
 			assertEquals(0, kv.size());
-			
+
 			// some modifications to the state
 			kv.setCurrentKey(1);
 			assertNull(kv.value());
@@ -163,7 +163,7 @@ public class MemoryStateBackendTest {
 			kv.setCurrentKey(1);
 			assertEquals("1", kv.value());
 			assertEquals(2, kv.size());
-			
+
 			// draw a snapshot
 			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 = 
 					kv.snapshot(682375462378L, System.currentTimeMillis());
@@ -188,9 +188,9 @@ public class MemoryStateBackendTest {
 			assertEquals("u2", kv.value());
 			kv.setCurrentKey(3);
 			assertEquals("u3", kv.value());
-			
+
 			// restore the first snapshot and validate it
-			KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend, 
+			KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
 							IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
 
 			assertEquals(2, restored1.size());
@@ -216,29 +216,29 @@ public class MemoryStateBackendTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testRestoreWithWrongSerializers() {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend();
 			KvState<Integer, String, MemoryStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-			
+					backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
 			kv.setCurrentKey(1);
 			kv.update("1");
 			kv.setCurrentKey(2);
 			kv.update("2");
-			
+
 			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
 					kv.snapshot(682375462378L, System.currentTimeMillis());
 
 
 			@SuppressWarnings("unchecked")
-			TypeSerializer<Integer> fakeIntSerializer = 
+			TypeSerializer<Integer> fakeIntSerializer =
 					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
 
 			@SuppressWarnings("unchecked")
-			TypeSerializer<String> fakeStringSerializer = 
+			TypeSerializer<String> fakeStringSerializer =
 					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
 
 			try {
@@ -276,20 +276,20 @@ public class MemoryStateBackendTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCopyDefaultValue() {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend();
 			KvState<Integer, IntValue, MemoryStateBackend> kv =
-					backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+					backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
 
 			kv.setCurrentKey(1);
 			IntValue default1 = kv.value();
 
 			kv.setCurrentKey(2);
 			IntValue default2 = kv.value();
-			
+
 			assertNotNull(default1);
 			assertNotNull(default2);
 			assertEquals(default1, default2);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 4e4acd2..4fb6820 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.hdfstests;
 
 import org.apache.commons.io.FileUtils;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -29,7 +28,7 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.hadoop.conf.Configuration;
@@ -63,7 +62,7 @@ public class FileStateBackendTest {
 	private static MiniDFSCluster HDFS_CLUSTER;
 	
 	private static FileSystem FS;
-
+	
 	// ------------------------------------------------------------------------
 	//  startup / shutdown
 	// ------------------------------------------------------------------------
@@ -127,7 +126,7 @@ public class FileStateBackendTest {
 				// supreme!
 			}
 
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 			assertNotNull(backend.getCheckpointDirectory());
 
 			Path checkpointDir = backend.getCheckpointDirectory();
@@ -150,7 +149,7 @@ public class FileStateBackendTest {
 		
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			Path checkpointDir = backend.getCheckpointDirectory();
 
@@ -186,7 +185,7 @@ public class FileStateBackendTest {
 	public void testStateOutputStream() {
 		try {
 			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
-			backend.initializeForJob(new JobID());
+			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			Path checkpointDir = backend.getCheckpointDirectory();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ce4d174..6c1f1ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -322,7 +322,7 @@ public abstract class AbstractStreamOperator<OUT>
 		
 		if (kvstate == null) {
 			// create a new blank key/value state
-			kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
+			kvstate = stateBackend.createKvState(getOperatorConfig().getVertexID() ,name , keySerializer, valueSerializer, defaultValue);
 		}
 
 		if (keyValueStatesByName == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 80c63da..ed7182d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -169,7 +169,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			accumulatorMap = accumulatorRegistry.getUserMap();
 
 			stateBackend = createStateBackend();
-			stateBackend.initializeForJob(getEnvironment().getJobID());
+			stateBackend.initializeForJob(getEnvironment());
 
 			headOperator = configuration.getStreamOperator(userClassLoader);
 			operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());

http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 67c0189..42b6230 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -50,10 +50,11 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 	final long NUM_STRINGS = 10_000_000L;
+	final static int NUM_KEYS = 40;
 
 	@Override
 	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", (NUM_STRINGS/2) % 40 == 0);
+		assertTrue("Broken test setup", (NUM_STRINGS/2) % NUM_KEYS == 0);
 
 		DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
 		DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
@@ -69,14 +70,14 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 	public void postSubmit() {
 		// verify that we counted exactly right
 		for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
-			assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
+			assertEquals(new Long(sum.getKey() * NUM_STRINGS / NUM_KEYS), sum.getValue());
 		}
 		for (Long count : CounterSink.allCounts.values()) {
-			assertEquals(new Long(NUM_STRINGS / 40), count);
+			assertEquals(new Long(NUM_STRINGS / NUM_KEYS), count);
 		}
 
-		assertEquals(40, CounterSink.allCounts.size());
-		assertEquals(40, OnceFailingPartitionedSum.allSums.size());
+		assertEquals(NUM_KEYS, CounterSink.allCounts.size());
+		assertEquals(NUM_KEYS, OnceFailingPartitionedSum.allSums.size());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -120,7 +121,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 				synchronized (lockingObject) {
 					index += step;
-					ctx.collect(index % 40);
+					ctx.collect(index % NUM_KEYS);
 				}
 			}
 		}
@@ -160,9 +161,9 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 		@Override
 		public void open(Configuration parameters) throws IOException {
-			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext()
+			long failurePosMin = (long) (0.6 * numElements / getRuntimeContext()
 					.getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext()
+			long failurePosMax = (long) (0.8 * numElements / getRuntimeContext()
 					.getNumberOfParallelSubtasks());
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
@@ -213,7 +214,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		}
 	}
 	
-	private static class NonSerializableLong {
+	public static class NonSerializableLong {
 		public Long value;
 
 		private NonSerializableLong(long value) {
@@ -225,7 +226,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		}
 	}
 	
-	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+	public static class IdentityKeySelector<T> implements KeySelector<T, T> {
 
 		@Override
 		public T getKey(T value) throws Exception {