You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/09 18:29:36 UTC

[3/3] flink git commit: [FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask to accept Environment and State in the constructor.

[FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask to accept Environment and State in the constructor.

This is the first steo towards implementing an RAII pattern for all task runtime classes.

This closes #3633


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6033de01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6033de01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6033de01

Branch: refs/heads/master
Commit: 6033de01ae620ebc9735c552ce85ccd1687793d7
Parents: 4c3f607
Author: Tony Wei <to...@gmail.com>
Authored: Wed Mar 15 11:13:41 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 9 18:30:59 2018 +0100

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  16 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  22 +-
 .../runtime/checkpoint/CheckpointOptions.java   |   5 +-
 .../iterative/task/AbstractIterativeTask.java   |  12 +
 .../iterative/task/IterationHeadTask.java       |  12 +
 .../task/IterationIntermediateTask.java         |  14 +
 .../task/IterationSynchronizationSinkTask.java  |  12 +
 .../iterative/task/IterationTailTask.java       |  14 +
 .../jobgraph/tasks/AbstractInvokable.java       | 114 +++++++--
 .../runtime/jobgraph/tasks/StatefulTask.java    |  88 -------
 .../flink/runtime/operators/BatchTask.java      |  17 +-
 .../flink/runtime/operators/DataSinkTask.java   |  16 +-
 .../flink/runtime/operators/DataSourceTask.java |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 255 ++++++++++---------
 .../checkpoint/CoordinatorShutdownTest.java     |  22 +-
 .../client/JobClientActorRecoveryITCase.java    |   5 +
 .../PartialConsumePipelinedResultTest.java      |   9 +
 .../jobmanager/JobManagerHARecoveryTest.java    |  40 +--
 .../runtime/jobmanager/JobManagerTest.java      |  17 +-
 .../SlotCountExceedingParallelismTest.java      |  23 +-
 .../ScheduleOrUpdateConsumersTest.java          |  13 +-
 .../runtime/operators/DataSinkTaskTest.java     |  28 +-
 .../runtime/operators/DataSourceTaskTest.java   |   8 +-
 .../InvokableClassConstructorTest.java          |  67 +++++
 .../operators/chaining/ChainTaskTest.java       |  18 +-
 .../chaining/ChainedAllReduceDriverTest.java    |   4 +-
 .../operators/testutils/DummyInvokable.java     |  12 +
 .../operators/testutils/TaskTestBase.java       |  34 +--
 .../BackPressureStatsTrackerITCase.java         |   5 +
 .../runtime/taskexecutor/TaskExecutorTest.java  |   5 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  27 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |   9 +
 .../runtime/taskmanager/TaskManagerTest.java    |  13 +
 .../flink/runtime/taskmanager/TaskTest.java     |  46 +++-
 .../testtasks/BlockingNoOpInvokable.java        |   5 +
 .../testtasks/FailingBlockingInvokable.java     |  10 +
 .../flink/runtime/testtasks/NoOpInvokable.java  |   5 +
 .../runtime/testtasks/WaitingNoOpInvokable.java |   5 +
 .../runtime/testutils/StoppableInvokable.java   |   9 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   5 +
 .../apache/flink/runtime/jobmanager/Tasks.scala |  43 +++-
 .../streaming/runtime/io/BarrierBuffer.java     |   6 +-
 .../streaming/runtime/io/BarrierTracker.java    |   8 +-
 .../runtime/io/CheckpointBarrierHandler.java    |   4 +-
 .../runtime/io/StreamInputProcessor.java        |   4 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   4 +-
 .../runtime/tasks/OneInputStreamTask.java       |  34 +++
 .../runtime/tasks/SourceStreamTask.java         |   8 +
 .../tasks/StoppableSourceStreamTask.java        |   6 +
 .../runtime/tasks/StreamIterationHead.java      |   8 +
 .../runtime/tasks/StreamIterationTail.java      |   6 +
 .../streaming/runtime/tasks/StreamTask.java     |  64 +++--
 .../runtime/tasks/TwoInputStreamTask.java       |  14 +
 .../operators/async/AsyncWaitOperatorTest.java  |  27 +-
 .../api/streamtask/StreamIterationHeadTest.java |   4 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |   6 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  26 +-
 .../runtime/io/BarrierTrackerTest.java          |  11 +-
 .../runtime/operators/StreamTaskTimerTest.java  |  17 +-
 .../TestProcessingTimeServiceTest.java          |  12 +-
 ...kpointExceptionHandlerConfigurationTest.java |  23 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  72 +++---
 .../tasks/OneInputStreamTaskTestHarness.java    |  30 ++-
 .../runtime/tasks/RestoreStreamTaskTest.java    |  47 +---
 .../SourceExternalCheckpointTriggerTest.java    |  10 +-
 .../tasks/SourceStreamTaskStoppingTest.java     |   9 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |  17 +-
 .../StreamTaskCancellationBarrierTest.java      |  22 +-
 .../tasks/StreamTaskTerminationTest.java        |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  51 +++-
 .../runtime/tasks/StreamTaskTestHarness.java    |  58 ++++-
 .../tasks/StreamTaskTestHarnessTest.java        |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |   6 +
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  26 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |  20 +-
 .../test/example/client/JobRetrievalITCase.java |   5 +
 .../runtime/NetworkStackThroughputITCase.java   |  13 +
 .../manual/MassiveCaseClassSortingITCase.scala  |  26 +-
 .../JobManagerLeaderSessionIDITCase.scala       |   3 +-
 79 files changed, 1196 insertions(+), 617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 71c5e77..8dc504a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -111,9 +111,9 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 	@Test
 	public void testFullyAsyncSnapshot() throws Exception {
 
-		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+				OneInputStreamTask::new,
+				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
@@ -179,6 +179,8 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		testHarness.invoke(mockEnv);
 
+		final OneInputStreamTask<String, String> task = testHarness.getTask();
+
 		// wait for the task to be running
 		for (Field field: StreamTask.class.getDeclaredFields()) {
 			if (field.getName().equals("isRunning")) {
@@ -213,13 +215,13 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 	/**
 	 * This tests ensures that canceling of asynchronous snapshots works as expected and does not block.
-	 * @throws Exception
 	 */
 	@Test
 	public void testCancelFullyAsyncCheckpoints() throws Exception {
-		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+				OneInputStreamTask::new,
+				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 		testHarness.setupOutputForSingletonOperatorChain();
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
@@ -278,6 +280,8 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		testHarness.invoke(mockEnv);
 
+		final OneInputStreamTask<String, String> task = testHarness.getTask();
+
 		// wait for the task to be running
 		for (Field field: StreamTask.class.getDeclaredFields()) {
 			if (field.getName().equals("isRunning")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c74db28..c79e3d7 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -120,24 +122,20 @@ public class JMXJobManagerMetricTest {
 	 * Utility to block/unblock a task.
 	 */
 	public static class BlockingInvokable extends AbstractInvokable {
-		private static boolean blocking = true;
-		private static final Object lock = new Object();
+
+		private static final OneShotLatch LATCH = new OneShotLatch();
+
+		public BlockingInvokable(Environment environment) {
+			super(environment);
+		}
 
 		@Override
 		public void invoke() throws Exception {
-			while (blocking) {
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
+			LATCH.await();
 		}
 
 		public static void unblock() {
-			blocking = false;
-
-			synchronized (lock) {
-				lock.notifyAll();
-			}
+			LATCH.trigger();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 27e5d0c..813c292 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -18,19 +18,20 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.io.Serializable;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 /**
  * Options for performing the checkpoint.
  *
  * <p>The {@link CheckpointProperties} are related and cover properties that
  * are only relevant at the {@link CheckpointCoordinator}. These options are
- * relevant at the {@link StatefulTask} instances running on task managers.
+ * relevant at the {@link AbstractInvokable} instances running on task managers.
  */
 public class CheckpointOptions implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index a36fc57..19eb3d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -66,6 +66,7 @@ import java.util.concurrent.Future;
  */
 public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
 		implements Terminable {
+
 	private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
 
 	protected LongSumAggregator worksetAggregator;
@@ -87,6 +88,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	private volatile boolean terminationRequested;
 
 	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public AbstractIterativeTask(Environment environment) {
+		super(environment);
+	}
+
+	// --------------------------------------------------------------------------------------------
 	// Main life cycle methods that implement the iterative behavior
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 2a95a65..1dd3da4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -100,6 +101,17 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public IterationHeadTask(Environment environment) {
+		super(environment);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	@Override
 	protected int getNumTaskInputs() {
 		// this task has an additional input in the workset case for the initial solution set

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index c5fd133..d1bd5db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -49,6 +50,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 
 	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
 
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public IterationIntermediateTask(Environment environment) {
+		super(environment);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	@Override
 	protected void initialize() throws Exception {
 		super.initialize();

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 6a38fcc..14f3612 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
@@ -73,6 +74,17 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public IterationSynchronizationSinkTask(Environment environment) {
+		super(environment);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	@Override
 	public void invoke() throws Exception {
 		this.headEventReader = new MutableRecordReader<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
index 3ec3a8e..7217bfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
@@ -47,6 +48,19 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 
 	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
 
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public IterationTailTask(Environment environment) {
+		super(environment);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	@Override
 	protected void initialize() throws Exception {
 		super.initialize();

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index f63a762..1734d68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -20,22 +20,53 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.BatchTask;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This is the abstract base class for every task that can be executed by a
- * TaskManager. Concrete tasks like the vertices of batch jobs (see
- * {@link BatchTask} inherit from this class.
+ * This is the abstract base class for every task that can be executed by a TaskManager.
+ * Concrete tasks extend this class, for example the streaming and batch tasks.
  *
  * <p>The TaskManager invokes the {@link #invoke()} method when executing a
  * task. All operations of the task happen in this method (setting up input
  * output stream readers and writers as well as the task's core operation).
+ *
+ * <p>All classes that extend must offer a constructor {@code MyTask(Environment, TaskStateSnapshot)}.
+ * Tasks that are always stateless can, for convenience, also only implement the constructor
+ * {@code MyTask(Environment)}.
+ *
+ * <p><i>Developer note: While constructors cannot be enforced at compile time, we did not yet venture
+ * on the endeavor of introducing factories (it is only an internal API after all, and with Java 8,
+ * one can use {@code Class::new} almost like a factory lambda.</i>
+ *
+ * <p><b>NOTE:</b> There is no constructor that accepts and initial task state snapshot
+ * and stores it in a variable. That is on purpose, because the AbstractInvokable itself
+ * does not need the state snapshot (only subclasses such as StreamTask do need the state)
+ * and we do not want to store a reference indefinitely, thus preventing cleanup of
+ * the initial state structure by the Garbage Collector.
+ *
+ * <p>Any subclass that supports recoverable state and participates in
+ * checkpointing needs to override {@link #triggerCheckpoint(CheckpointMetaData, CheckpointOptions)},
+ * {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)},
+ * {@link #abortCheckpointOnBarrier(long, Throwable)} and {@link #notifyCheckpointComplete(long)}.
  */
 public abstract class AbstractInvokable {
 
 	/** The environment assigned to this invokable. */
-	private Environment environment;
+	private final Environment environment;
+
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public AbstractInvokable(Environment environment) {
+		this.environment = checkNotNull(environment);
+	}
 
 	/**
 	 * Starts the execution.
@@ -46,7 +77,7 @@ public abstract class AbstractInvokable {
 	 *
 	 * <p>All resources should be cleaned up when the method returns. Make sure
 	 * to guard the code with <code>try-finally</code> blocks where necessary.
-	 * 
+	 *
 	 * @throws Exception
 	 *         Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
 	 */
@@ -62,16 +93,6 @@ public abstract class AbstractInvokable {
 	public void cancel() throws Exception {
 		// The default implementation does nothing.
 	}
-	
-	/**
-	 * Sets the environment of this task.
-	 * 
-	 * @param environment
-	 *        the environment of this task
-	 */
-	public final void setEnvironment(Environment environment) {
-		this.environment = environment;
-	}
 
 	/**
 	 * Returns the environment of this task.
@@ -133,4 +154,65 @@ public abstract class AbstractInvokable {
 	public ExecutionConfig getExecutionConfig() {
 		return this.environment.getExecutionConfig();
 	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing Methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method is called to trigger a checkpoint, asynchronously by the checkpoint
+	 * coordinator.
+	 *
+	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
+	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
+	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)}
+	 * method.
+	 *
+	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointOptions Options for performing this checkpoint
+	 *
+	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
+	 */
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
+		throw new UnsupportedOperationException(String.format("triggerCheckpoint not supported by %s", this.getClass().getName()));
+	}
+
+	/**
+	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+	 * barriers on all input streams.
+	 *
+	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointOptions Options for performing this checkpoint
+	 * @param checkpointMetrics Metrics about this checkpoint
+	 *
+	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+	 */
+	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
+		throw new UnsupportedOperationException(String.format("triggerCheckpointOnBarrier not supported by %s", this.getClass().getName()));
+	}
+
+	/**
+	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
+	 * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+	 *
+	 * <p>This requires implementing tasks to forward a
+	 * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
+	 *
+	 * @param checkpointId The ID of the checkpoint to be aborted.
+	 * @param cause The reason why the checkpoint was aborted during alignment
+	 */
+	public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
+		throw new UnsupportedOperationException(String.format("abortCheckpointOnBarrier not supported by %s", this.getClass().getName()));
+	}
+
+	/**
+	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
+	 * the notification from all participating tasks.
+	 *
+	 * @param checkpointId The ID of the checkpoint that is complete..
+	 * @throws Exception The notification method may forward its exceptions.
+	 */
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		throw new UnsupportedOperationException(String.format("notifyCheckpointComplete not supported by %s", this.getClass().getName()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
deleted file mode 100644
index 00db01f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-
-/**
- * This interface must be implemented by any invokable that has recoverable state and participates
- * in checkpointing.
- */
-public interface StatefulTask {
-
-	/**
-	 * Sets the initial state of the operator, upon recovery. The initial state is typically
-	 * a snapshot of the state from a previous execution.
-	 *
-	 * @param taskStateHandles All state handle for the task.
-	 */
-	void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception;
-
-	/**
-	 * This method is called to trigger a checkpoint, asynchronously by the checkpoint
-	 * coordinator.
-	 * 
-	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
-	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
-	 * receiving checkpoint barriers, invoke the
-	 * {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)} method.
-	 *
-	 * @param checkpointMetaData Meta data for about this checkpoint
-	 * @param checkpointOptions Options for performing this checkpoint
-	 *
-	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
-	 */
-	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception;
-
-	/**
-	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
-	 * barriers on all input streams.
-	 * 
-	 * @param checkpointMetaData Meta data for about this checkpoint
-	 * @param checkpointOptions Options for performing this checkpoint
-	 * @param checkpointMetrics Metrics about this checkpoint
-	 * 
-	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
-	 */
-	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception;
-
-	/**
-	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
-	 * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
-	 * 
-	 * <p>This requires implementing tasks to forward a
-	 * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
-	 * 
-	 * @param checkpointId The ID of the checkpoint to be aborted.
-	 * @param cause The reason why the checkpoint was aborted during alignment   
-	 */
-	void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception;
-
-	/**
-	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
-	 * the notification from all participating tasks.
-	 *
-	 * @param checkpointId The ID of the checkpoint that is complete..
-	 * @throws Exception The notification method may forward its exceptions.
-	 */
-	void notifyCheckpointComplete(long checkpointId) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index be81877..e697869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -79,7 +79,7 @@ import java.util.Map;
 public class BatchTask<S extends Function, OT> extends AbstractInvokable implements TaskContext<S, OT> {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(BatchTask.class);
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -216,6 +216,19 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 	private OperatorMetricGroup metrics;
 
 	// --------------------------------------------------------------------------------------------
+	//                                  Constructor
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public BatchTask(Environment environment) {
+		super(environment);
+	}
+
+	// --------------------------------------------------------------------------------------------
 	//                                  Task Interface
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index bb253ab..0ea376e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,12 +56,12 @@ import org.slf4j.LoggerFactory;
  * @see OutputFormat
  */
 public class DataSinkTask<IT> extends AbstractInvokable {
-	
+
 	// Obtain DataSinkTask Logger
 	private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	// OutputFormat instance. volatile, because the asynchronous canceller may access it
 	private volatile OutputFormat<IT> format;
 
@@ -83,6 +84,15 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 	
 	private volatile boolean cleanupCalled;
 
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public DataSinkTask(Environment environment) {
+		super(environment);
+	}
+
 	@Override
 	public void invoke() throws Exception {
 		// --------------------------------------------------------------------
@@ -298,7 +308,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		
 		LOG.debug(getLogString("Cancelling data sink operator"));
 	}
-	
+
 	/**
 	 * Initializes the OutputFormat implementation and configuration.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 1437877..cdfe1fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +81,15 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 	// cancel flag
 	private volatile boolean taskCanceled = false;
 
+	/**
+	 * Create an Invokable task and set its environment.
+	 *
+	 * @param environment The environment assigned to this invokable.
+	 */
+	public DataSourceTask(Environment environment) {
+		super(environment);
+	}
+
 	@Override
 	public void invoke() throws Exception {
 		// --------------------------------------------------------------------
@@ -244,7 +254,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		this.taskCanceled = true;
 		LOG.debug(getLogString("Cancelling data source operator"));
 	}
-	
+
 	/**
 	 * Initializes the InputFormat implementation and configuration.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3c1e98e..16437d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -62,13 +61,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.WrappingRuntimeException;
@@ -80,6 +79,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.HashMap;
@@ -589,9 +590,6 @@ public class Task implements Runnable, TaskActions {
 				taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
 			}
 
-			// now load the task's invokable code
-			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
-
 			if (isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
@@ -653,8 +651,7 @@ public class Task implements Runnable, TaskActions {
 			//  call the user code initialization methods
 			// ----------------------------------------------------------------
 
-			TaskKvStateRegistry kvStateRegistry = network
-					.createKvStateTaskRegistry(jobId, getJobVertexId());
+			TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
 
 			Environment env = new RuntimeEnvironment(
 				jobId,
@@ -680,25 +677,8 @@ public class Task implements Runnable, TaskActions {
 				metrics,
 				this);
 
-			// let the task code create its readers and writers
-			invokable.setEnvironment(env);
-
-			// the very last thing before the actual execution starts running is to inject
-			// the state into the task. the state is non-empty if this is an execution
-			// of a task that failed but had backuped state from a checkpoint
-
-			if (null != taskStateHandles) {
-				if (invokable instanceof StatefulTask) {
-					StatefulTask op = (StatefulTask) invokable;
-					op.setInitialState(taskStateHandles);
-				} else {
-					throw new IllegalStateException("Found operator state for a non-stateful task invokable");
-				}
-				// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
-				// we clear the reference to the state handle
-				//noinspection UnusedAssignment
-				taskStateHandles = null;
-			}
+			// now load and instantiate the task's invokable code
+			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env, taskStateHandles);
 
 			// ----------------------------------------------------------------
 			//  actual task core work
@@ -893,23 +873,6 @@ public class Task implements Runnable, TaskActions {
 		return userCodeClassLoader;
 	}
 
-	private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception {
-		Class<? extends AbstractInvokable> invokableClass;
-		try {
-			invokableClass = Class.forName(className, true, classLoader)
-					.asSubclass(AbstractInvokable.class);
-		}
-		catch (Throwable t) {
-			throw new Exception("Could not load the task's invokable class.", t);
-		}
-		try {
-			return invokableClass.newInstance();
-		}
-		catch (Throwable t) {
-			throw new Exception("Could not instantiate the task's invokable class.", t);
-		}
-	}
-
 	private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) {
 		// cancel and release all distributed cache files
 		try {
@@ -991,8 +954,8 @@ public class Task implements Runnable, TaskActions {
 				@Override
 				public void run() {
 					try {
-						((StoppableTask)invokable).stop();
-					} catch(RuntimeException e) {
+						((StoppableTask) invokable).stop();
+					} catch (RuntimeException e) {
 						LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e);
 						taskManagerActions.failTask(executionId, e);
 					}
@@ -1172,8 +1135,7 @@ public class Task implements Runnable, TaskActions {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Calls the invokable to trigger a checkpoint, if the invokable implements the interface
-	 * {@link StatefulTask}.
+	 * Calls the invokable to trigger a checkpoint.
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
@@ -1188,52 +1150,43 @@ public class Task implements Runnable, TaskActions {
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
-			if (invokable instanceof StatefulTask) {
-				// build a local closure
-				final StatefulTask statefulTask = (StatefulTask) invokable;
-				final String taskName = taskNameWithSubtask;
-				final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
-					FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
-				Runnable runnable = new Runnable() {
-					@Override
-					public void run() {
-						// set safety net from the task's context for checkpointing thread
-						LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
-						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
-						try {
-							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
-							if (!success) {
-								checkpointResponder.declineCheckpoint(
-										getJobID(), getExecutionId(), checkpointID,
-										new CheckpointDeclineTaskNotReadyException(taskName));
-							}
+			// build a local closure
+			final String taskName = taskNameWithSubtask;
+			final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
+				FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
+
+			Runnable runnable = new Runnable() {
+				@Override
+				public void run() {
+					// set safety net from the task's context for checkpointing thread
+					LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
+					FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
+
+					try {
+						boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
+						if (!success) {
+							checkpointResponder.declineCheckpoint(
+									getJobID(), getExecutionId(), checkpointID,
+									new CheckpointDeclineTaskNotReadyException(taskName));
 						}
-						catch (Throwable t) {
-							if (getExecutionState() == ExecutionState.RUNNING) {
-								failExternally(new Exception(
-									"Error while triggering checkpoint " + checkpointID + " for " +
-										taskNameWithSubtask, t));
-							} else {
-								LOG.debug("Encountered error while triggering checkpoint {} for " +
-									"{} ({}) while being not in state running.", checkpointID,
-									taskNameWithSubtask, executionId, t);
-							}
-						} finally {
-							FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
+					}
+					catch (Throwable t) {
+						if (getExecutionState() == ExecutionState.RUNNING) {
+							failExternally(new Exception(
+								"Error while triggering checkpoint " + checkpointID + " for " +
+									taskNameWithSubtask, t));
+						} else {
+							LOG.debug("Encountered error while triggering checkpoint {} for " +
+								"{} ({}) while being not in state running.", checkpointID,
+								taskNameWithSubtask, executionId, t);
 						}
+					} finally {
+						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
 					}
-				};
-				executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
-			}
-			else {
-				checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
-						new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
-				
-				LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
-						taskNameWithSubtask, executionId);
-
-			}
+				}
+			};
+			executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
 		}
 		else {
 			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
@@ -1245,37 +1198,25 @@ public class Task implements Runnable, TaskActions {
 	}
 
 	public void notifyCheckpointComplete(final long checkpointID) {
-		AbstractInvokable invokable = this.invokable;
+		final AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
-			if (invokable instanceof StatefulTask) {
-
-				// build a local closure
-				final StatefulTask statefulTask = (StatefulTask) invokable;
-				final String taskName = taskNameWithSubtask;
 
-				Runnable runnable = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							statefulTask.notifyCheckpointComplete(checkpointID);
-						}
-						catch (Throwable t) {
-							if (getExecutionState() == ExecutionState.RUNNING) {
-								// fail task if checkpoint confirmation failed.
-								failExternally(new RuntimeException(
-									"Error while confirming checkpoint",
-									t));
-							}
+			Runnable runnable = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						invokable.notifyCheckpointComplete(checkpointID);
+					}
+					catch (Throwable t) {
+						if (getExecutionState() == ExecutionState.RUNNING) {
+							// fail task if checkpoint confirmation failed.
+							failExternally(new RuntimeException("Error while confirming checkpoint", t));
 						}
 					}
-				};
-				executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
-			}
-			else {
-				LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.",
-						taskNameWithSubtask);
-			}
+				}
+			};
+			executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskNameWithSubtask);
 		}
 		else {
 			LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
@@ -1344,7 +1285,7 @@ public class Task implements Runnable, TaskActions {
 
 	/**
 	 * Utility method to dispatch an asynchronous call on the invokable.
-	 * 
+	 *
 	 * @param runnable The async call runnable.
 	 * @param callName The name of the call, for logging purposes.
 	 */
@@ -1413,6 +1354,88 @@ public class Task implements Runnable, TaskActions {
 	}
 
 	/**
+	 * Instantiates the given task invokable class, passing the given environment (and possibly
+	 * the initial task state) to the task's constructor.
+	 *
+	 * <p>The method will first try to instantiate the task via a constructor accepting both
+	 * the Environment and the TaskStateSnapshot. If no such constructor exists, and there is
+	 * no initial state, the method will fall back to the stateless convenience constructor that
+	 * accepts only the Environment.
+	 *
+	 * @param classLoader The classloader to load the class through.
+	 * @param className The name of the class to load.
+	 * @param environment The task environment.
+	 * @param initialState The task's initial state. Null, if the task is either stateless, or
+	 *                     initialized with empty state.
+	 *
+	 * @return The instantiated invokable task object.
+	 *
+	 * @throws Throwable Forwards all exceptions that happen during initialization of the task.
+	 *                   Also throws an exception if the task class misses the necessary constructor.
+	 */
+	private static AbstractInvokable loadAndInstantiateInvokable(
+			ClassLoader classLoader,
+			String className,
+			Environment environment,
+			@Nullable TaskStateSnapshot initialState) throws Throwable {
+
+		final Class<? extends AbstractInvokable> invokableClass;
+		try {
+			invokableClass = Class.forName(className, true, classLoader)
+					.asSubclass(AbstractInvokable.class);
+		}
+		catch (Throwable t) {
+			throw new Exception("Could not load the task's invokable class.", t);
+		}
+
+		Constructor<? extends AbstractInvokable> statefulCtor = null;
+		Constructor<? extends AbstractInvokable> statelessCtor = null;
+
+		// try to find and call the constructor that accepts state
+		try {
+			//noinspection JavaReflectionMemberAccess
+			statefulCtor = invokableClass.getConstructor(Environment.class, TaskStateSnapshot.class);
+		}
+		catch (NoSuchMethodException e) {
+			if (initialState == null) {
+				// we allow also the constructor that takes no state, as a convenience for stateless
+				// tasks so that they are not forced to carry the stateful constructor
+				try {
+					statelessCtor = invokableClass.getConstructor(Environment.class);
+				}
+				catch (NoSuchMethodException ee) {
+					throw new FlinkException("Task misses proper constructor", ee);
+				}
+			}
+			else {
+				throw new FlinkException("Task has state to restore, but misses the stateful constructor", e);
+			}
+		}
+
+		// instantiate the class
+		try {
+			if (statefulCtor != null) {
+				return statefulCtor.newInstance(environment, initialState);
+			}
+			else {
+				//noinspection ConstantConditions  --> cannot happen
+				return statelessCtor.newInstance(environment);
+			}
+		}
+		catch (InvocationTargetException e) {
+			// directly forward exceptions from the eager initialization
+			throw e.getTargetException();
+		}
+		catch (Exception e) {
+			throw new FlinkException("Could not instantiate the task's invokable class.", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  TaskCanceler
+	// ------------------------------------------------------------------------
+
+	/**
 	 * This runner calls cancel() on the invokable and periodically interrupts the
 	 * thread until it has terminated.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 482290a..7d6c7b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -191,24 +193,20 @@ public class CoordinatorShutdownTest extends TestLogger {
 	}
 
 	public static class BlockingInvokable extends AbstractInvokable {
-		private static boolean blocking = true;
-		private static final Object lock = new Object();
+
+		private static final OneShotLatch LATCH = new OneShotLatch();
+
+		public BlockingInvokable(Environment environment) {
+			super(environment);
+		}
 
 		@Override
 		public void invoke() throws Exception {
-			while (blocking) {
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
+			LATCH.await();
 		}
 
 		public static void unblock() {
-			blocking = false;
-
-			synchronized (lock) {
-				lock.notifyAll();
-			}
+			LATCH.trigger();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index 301d206..ff15d67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -23,6 +23,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -142,6 +143,10 @@ public class JobClientActorRecoveryITCase extends TestLogger {
 		private volatile static int HasBlockedExecution = 0;
 		private static Object waitLock = new Object();
 
+		public BlockingTask(Environment environment) {
+			super(environment);
+		}
+
 		@Override
 		public void invoke() throws Exception {
 			if (BlockExecution > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f57726c..9481d57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -110,6 +111,10 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	 */
 	public static class SlowBufferSender extends AbstractInvokable {
 
+		public SlowBufferSender(Environment environment) {
+			super(environment);
+		}
+
 		@Override
 		public void invoke() throws Exception {
 			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
@@ -128,6 +133,10 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	 */
 	public static class SingleBufferReceiver extends AbstractInvokable {
 
+		public SingleBufferReceiver(Environment environment) {
+			super(environment);
+		}
+
 		@Override
 		public void invoke() throws Exception {
 			InputGate gate = getEnvironment().getInputGate(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 12bb95e..79f7342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -56,7 +58,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -97,6 +98,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -120,6 +122,8 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.runtime.BoxedUnit;
 
+import javax.annotation.Nullable;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -486,28 +490,23 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 	public static class BlockingInvokable extends AbstractInvokable {
 
-		private static boolean blocking = true;
-		private static Object lock = new Object();
+		private static final OneShotLatch LATCH = new OneShotLatch();
+
+		public BlockingInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
+			super(environment);
+		}
 
 		@Override
 		public void invoke() throws Exception {
-			while (blocking) {
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
+			LATCH.await();
 		}
 
 		public static void unblock() {
-			blocking = false;
-
-			synchronized (lock) {
-				lock.notifyAll();
-			}
+			LATCH.trigger();
 		}
 	}
 
-	public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask {
+	public static class BlockingStatefulInvokable extends BlockingInvokable {
 
 		private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
 
@@ -517,15 +516,18 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 		private int completedCheckpoints = 0;
 
-		@Override
-		public void setInitialState(
-			TaskStateSnapshot taskStateHandles) throws Exception {
+		public BlockingStatefulInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
+			super(environment, initialState);
+
 			int subtaskIndex = getIndexInSubtaskGroup();
-			if (subtaskIndex < recoveredStates.length) {
-				OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(taskStateHandles);
+			if (initialState != null && subtaskIndex < recoveredStates.length) {
+				OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(initialState);
 				try (FSDataInputStream in = operatorStateHandle.openInputStream()) {
 					recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
 				}
+				catch (IOException | ClassNotFoundException e) {
+					throw new RuntimeException(e.getMessage(), e);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5bc207a..ecf2ae3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManage
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -61,7 +62,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
@@ -1525,19 +1525,24 @@ public class JobManagerTest extends TestLogger {
 	/**
 	 * A blocking stateful source task that declines savepoints.
 	 */
-	public static class FailOnSavepointSourceTask extends AbstractInvokable implements StatefulTask {
+	public static class FailOnSavepointSourceTask extends AbstractInvokable {
 
 		private static final CountDownLatch CHECKPOINT_AFTER_SAVEPOINT_LATCH = new CountDownLatch(1);
 
 		private boolean receivedSavepoint;
 
-		@Override
-		public void invoke() throws Exception {
-			new CountDownLatch(1).await();
+		/**
+		 * Create an Invokable task and set its environment.
+		 *
+		 * @param environment The environment assigned to this invokable.
+		 */
+		public FailOnSavepointSourceTask(Environment environment, TaskStateSnapshot initialState) {
+			super(environment);
 		}
 
 		@Override
-		public void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception {
+		public void invoke() throws Exception {
+			new CountDownLatch(1).await();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 49b11b5..4ce83cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -19,18 +19,19 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.types.IntValue;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,9 +41,9 @@ import java.util.BitSet;
 public class SlotCountExceedingParallelismTest extends TestLogger {
 
 	// Test configuration
-	private final static int NUMBER_OF_TMS = 2;
-	private final static int NUMBER_OF_SLOTS_PER_TM = 2;
-	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+	private static final int NUMBER_OF_TMS = 2;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
 	private static TestingCluster flink;
 
@@ -120,7 +121,11 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
 	 */
 	public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
 
-		public final static String CONFIG_KEY = "number-of-times-to-send";
+		public static final String CONFIG_KEY = "number-of-times-to-send";
+
+		public RoundRobinSubtaskIndexSender(Environment environment) {
+			super(environment);
+		}
 
 		@Override
 		public void invoke() throws Exception {
@@ -147,7 +152,11 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
 	 */
 	public static class SubtaskIndexReceiver extends AbstractInvokable {
 
-		public final static String CONFIG_KEY = "number-of-indexes-to-receive";
+		public static final String CONFIG_KEY = "number-of-indexes-to-receive";
+
+		public SubtaskIndexReceiver(Environment environment) {
+			super(environment);
+		}
 
 		@Override
 		public void invoke() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index d861455..902c925 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -42,9 +43,9 @@ import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismT
 
 public class ScheduleOrUpdateConsumersTest extends TestLogger {
 
-	private final static int NUMBER_OF_TMS = 2;
-	private final static int NUMBER_OF_SLOTS_PER_TM = 2;
-	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+	private static final int NUMBER_OF_TMS = 2;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
 	private static TestingCluster flink;
 
@@ -124,7 +125,11 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger {
 
 	public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable {
 
-		public final static String CONFIG_KEY = "number-of-times-to-send";
+		public static final String CONFIG_KEY = "number-of-times-to-send";
+
+		public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
+			super(environment);
+		}
 
 		@Override
 		public void invoke() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 1fd004f..c92b2f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -77,9 +77,9 @@ public class DataSinkTaskTest extends TaskTestBase {
 			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 			super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
-			DataSinkTask<Record> testTask = new DataSinkTask<>();
+			DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 
-			super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+			super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
 			testTask.invoke();
 
@@ -139,9 +139,9 @@ public class DataSinkTaskTest extends TaskTestBase {
 		readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
 		readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 
-		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+		super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
 		try {
 			// For the union reader to work, we need to start notifications *after* the union reader
@@ -215,7 +215,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
@@ -225,7 +225,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+		super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
 		try {
 			testTask.invoke();
@@ -293,11 +293,11 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
-		super.registerFileOutputTask(testTask, MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
+		super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
 
 		boolean stubFailed = false;
 
@@ -325,7 +325,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
@@ -337,7 +337,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-		super.registerFileOutputTask(testTask, MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
+		super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
 
 		boolean stubFailed = false;
 
@@ -359,11 +359,11 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
-		final DataSinkTask<Record> testTask = new DataSinkTask<>();
+		final DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
-		super.registerFileOutputTask(testTask, MockOutputFormat.class,  new File(tempTestPath).toURI().toString());
+		super.registerFileOutputTask(MockOutputFormat.class,  new File(tempTestPath).toURI().toString());
 
 		Thread taskRunner = new Thread() {
 			@Override
@@ -407,7 +407,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
-		final DataSinkTask<Record> testTask = new DataSinkTask<>();
+		final DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
@@ -419,7 +419,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-		super.registerFileOutputTask(testTask, MockOutputFormat.class,  new File(tempTestPath).toURI().toString());
+		super.registerFileOutputTask(MockOutputFormat.class,  new File(tempTestPath).toURI().toString());
 
 		Thread taskRunner = new Thread() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 82f3d1d..9a3c956 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -78,8 +78,8 @@ public class DataSourceTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
-		DataSourceTask<Record> testTask = new DataSourceTask<>();
-		
+		DataSourceTask<Record> testTask = new DataSourceTask<>(this.mockEnv);
+
 		super.registerFileInputTask(testTask, MockInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
 		
 		try {
@@ -144,7 +144,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
-		DataSourceTask<Record> testTask = new DataSourceTask<>();
+		DataSourceTask<Record> testTask = new DataSourceTask<>(this.mockEnv);
 
 		super.registerFileInputTask(testTask, MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -178,7 +178,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 			Assert.fail("Unable to set-up test input file");
 		}
 		
-		final DataSourceTask<Record> testTask = new DataSourceTask<>();
+		final DataSourceTask<Record> testTask = new DataSourceTask<>(this.mockEnv);
 
 		super.registerFileInputTask(testTask, MockDelayingInputFormat.class,  new File(tempTestPath).toURI().toString(), "\n");
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java
new file mode 100644
index 0000000..5880f05
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
+import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that validate that stateless/stateful task implementations have the corresponding constructors.
+ */
+public class InvokableClassConstructorTest {
+
+	private static final Class<?>[] STATELESS_TASKS = {
+		IterationHeadTask.class,
+		IterationIntermediateTask.class,
+		IterationTailTask.class,
+		IterationSynchronizationSinkTask.class,
+		DataSourceTask.class,
+		DataSinkTask.class
+	};
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testNoStatefulConstructor() throws Exception {
+		for (Class<?> clazz: STATELESS_TASKS) {
+
+			// check that there is a constructor for Environment only
+			clazz.getConstructor(Environment.class);
+
+			try {
+				// check that there is NO constructor for Environment and Task State
+				clazz.getDeclaredConstructor(Environment.class, TaskStateSnapshot.class);
+				fail("Should fail with an exception");
+			}
+			catch (NoSuchMethodException e) {
+				// expected
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 0a9efb0..6f15884 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -41,6 +40,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -96,9 +96,8 @@ public class ChainTaskTest extends TaskTestBase {
 			
 			// chained map+combine
 			{
-				BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
-											new BatchTask<>();
-				registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
+				registerTask(FlatMapDriver.class, MockMapStub.class);
+				BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
 				
 				try {
 					testTask.invoke();
@@ -157,19 +156,16 @@ public class ChainTaskTest extends TaskTestBase {
 			
 			// chained map+combine
 			{
-				final BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
-											new BatchTask<>();
-				
-				super.registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
-	
+				registerTask(FlatMapDriver.class, MockMapStub.class);
+				final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
 				boolean stubFailed = false;
-				
 				try {
 					testTask.invoke();
 				} catch (Exception e) {
 					stubFailed = true;
 				}
-				
+
 				Assert.assertTrue("Function exception was not forwarded.", stubFailed);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
index 51ac665..a81e959 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
@@ -92,8 +92,8 @@ public class ChainedAllReduceDriverTest extends TaskTestBase {
 
 			// chained map+reduce
 			{
-				BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>();
-				registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
+				registerTask(FlatMapDriver.class, MockMapStub.class);
+				BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(mockEnv);
 
 				try {
 					testTask.invoke();

http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
index c47ab33..cb0faf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
@@ -20,13 +20,25 @@ package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
+import javax.annotation.Nullable;
+
 /**
  * An invokable that does nothing.
  */
 public class DummyInvokable extends AbstractInvokable {
 
+	public DummyInvokable() {
+		super(new DummyEnvironment("test", 1, 0));
+	}
+
+	public DummyInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
+		super(environment);
+	}
+
 	@Override
 	public void invoke() {}