You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/27 15:18:23 UTC

[1/3] flink git commit: [hotfix] [tests] Speed up streaming state tests by skipping default retry delay.

Repository: flink
Updated Branches:
  refs/heads/master e5d62da2c -> 6e123d287


[hotfix] [tests] Speed up streaming state tests by skipping default retry delay.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e123d28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e123d28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e123d28

Branch: refs/heads/master
Commit: 6e123d287443430bf1721952c5692069e41d95cc
Parents: b1642e3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 27 14:53:57 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 14:58:41 2016 +0200

----------------------------------------------------------------------
 .../flink/test/checkpointing/StreamFaultToleranceTestBase.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e123d28/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5f6cd4a..10f78d4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -98,6 +99,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 			env.setParallelism(PARALLELISM);
 			env.enableCheckpointing(500);
 			env.getConfig().disableSysoutLogging();
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
 
 			testProgram(env);
 


[2/3] flink git commit: [FLINK-4685] [checkpoints] Gather sync/async duration and alignment information for task checkpoints

Posted by se...@apache.org.
[FLINK-4685] [checkpoints] Gather sync/async duration and alignment information for task checkpoints

This adds to each 'acknowledge checkpoint' message
  - number of bytes buffered during alignment
  - duration of alignment phase
  - duration of synchronous part of the operator checkpoint
  - duration of asynchronous part of the operator checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1642e32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1642e32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1642e32

Branch: refs/heads/master
Commit: b1642e32c2f69c60c2b212260c3479feb66a9165
Parents: 6ea9284
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 26 14:10:21 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 14:58:41 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         | 19 ++--
 .../flink/runtime/execution/Environment.java    | 43 +++++++--
 .../runtime/jobgraph/tasks/StatefulTask.java    | 27 ++++--
 .../checkpoint/AcknowledgeCheckpoint.java       | 93 +++++++++++++++++---
 .../ActorGatewayCheckpointResponder.java        | 23 ++---
 .../taskmanager/CheckpointResponder.java        | 29 ++++--
 .../runtime/taskmanager/RuntimeEnvironment.java | 27 ++++--
 .../jobmanager/JobManagerHARecoveryTest.java    | 11 ++-
 .../operators/testutils/DummyEnvironment.java   | 14 +--
 .../operators/testutils/MockEnvironment.java    | 15 ++--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  6 ++
 .../streaming/runtime/io/BarrierBuffer.java     | 83 +++++++++++------
 .../streaming/runtime/io/BarrierTracker.java    | 44 +++++----
 .../streaming/runtime/io/BufferSpiller.java     | 36 +++++---
 .../runtime/io/CheckpointBarrierHandler.java    | 30 +++++--
 .../runtime/io/StreamInputProcessor.java        | 24 +++--
 .../runtime/io/StreamTwoInputProcessor.java     | 16 ++--
 .../runtime/tasks/OneInputStreamTask.java       |  2 +-
 .../streaming/runtime/tasks/StreamTask.java     | 87 ++++++++++++------
 .../runtime/tasks/TwoInputStreamTask.java       |  2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 88 ++++++++++++++----
 .../runtime/io/BarrierTrackerTest.java          | 40 +++++++--
 .../runtime/tasks/OneInputStreamTaskTest.java   | 21 +++--
 .../runtime/tasks/StreamMockEnvironment.java    | 14 +--
 24 files changed, 576 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index d5b9b46..c0c9ca1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -136,15 +136,16 @@ public class RocksDBAsyncSnapshotTest {
 				testHarness.bufferSize) {
 
 			@Override
-			public void acknowledgeCheckpoint(long checkpointId) {
-				super.acknowledgeCheckpoint(checkpointId);
-			}
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId,
-			                                  ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			                                  List<KeyGroupsStateHandle> keyGroupStateHandles) {
-				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
+			public void acknowledgeCheckpoint(
+					long checkpointId,
+					ChainedStateHandle<StreamStateHandle> chainedStateHandle, 
+					List<KeyGroupsStateHandle> keyGroupStateHandles,
+					long synchronousDurationMillis, long asynchronousDurationMillis,
+					long bytesBufferedInAlignment, long alignmentDurationNanos) {
+
+				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles,
+						synchronousDurationMillis, asynchronousDurationMillis,
+						bytesBufferedInAlignment, alignmentDurationNanos);
 
 				// block on the latch, to verify that triggerCheckpoint returns below,
 				// even though the async checkpoint would not finish

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 1eee9d4..273c0d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -164,23 +164,52 @@ public interface Environment {
 	 * to for the checkpoint with the give checkpoint-ID. This method does not include
 	 * any state in the checkpoint.
 	 * 
-	 * @param checkpointId The ID of the checkpoint.
+	 * @param checkpointId
+	 *             The ID of the checkpoint.
+	 * @param synchronousDurationMillis
+	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
+	 * @param asynchronousDurationMillis
+	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
+	 * @param bytesBufferedInAlignment
+	 *             The number of bytes that were buffered during the checkpoint alignment phase
+	 * @param alignmentDurationNanos
+	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
 	 */
-	void acknowledgeCheckpoint(long checkpointId);
+	void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos);
 
 	/**
 	 * Confirms that the invokable has successfully completed all required steps for
 	 * the checkpoint with the give checkpoint-ID. This method does include
 	 * the given state in the checkpoint.
 	 *
-	 * @param checkpointId The ID of the checkpoint.
-	 * @param chainedStateHandle Handle for the chained operator state
-	 * @param keyGroupStateHandles  Handles for key group state
+	 * @param checkpointId
+	 *             The ID of the checkpoint.
+	 * @param chainedStateHandle
+	 *             Handle for the chained operator state
+	 * @param keyGroupStateHandles
+	 *             Handles for key group state
+	 * @param synchronousDurationMillis
+	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
+	 * @param asynchronousDurationMillis
+	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
+	 * @param bytesBufferedInAlignment
+	 *             The number of bytes that were buffered during the checkpoint alignment phase
+	 * @param alignmentDurationNanos
+	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
 	 */
 	void acknowledgeCheckpoint(
 			long checkpointId,
 			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles);
+			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos);
 
 	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself
@@ -189,7 +218,7 @@ public interface Environment {
 	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
 	 * starts an asynchronous thread that aborts that code.
 	 *
-	 * <p>This method never blocks.</p>
+	 * <p>This method never blocks.
 	 */
 	void failExternally(Throwable cause);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index cab7ed6..9ddfdf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
-
 import java.util.List;
 
 /**
@@ -41,18 +40,34 @@ public interface StatefulTask {
 	void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
 
 	/**
-	 * This method is either called directly and asynchronously by the checkpoint
-	 * coordinator (in the case of functions that are directly notified - usually
-	 * the data sources), or called synchronously when all incoming channels have
-	 * reported a checkpoint barrier.
+	 * This method is called to trigger a checkpoint, asynchronously by the checkpoint
+	 * coordinator.
+	 * 
+	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
+	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
+	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(long, long, long, long)}
+	 * method.
 	 *
-	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
 	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
 	 *
 	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
 	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
 
+	/**
+	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+	 * barriers on all input streams.
+	 * 
+	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 * @param bytesAligned The number of bytes that were buffered during the alignment of the streams.
+	 * @param alignmentTimeNanos The time that the stream alignment took, in nanoseconds.   
+	 * 
+	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+	 */
+	void triggerCheckpointOnBarrier(
+			long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 0c56603..72396eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
@@ -37,27 +39,74 @@ import java.util.List;
 public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
 
 	private static final long serialVersionUID = -7606214777192401493L;
-	
+
 	private final ChainedStateHandle<StreamStateHandle> stateHandle;
 
 	private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
 
-	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+	/** The duration (in milliseconds) that the synchronous part of the checkpoint took */
+	private final long synchronousDurationMillis;
+
+	/** The duration (in milliseconds) that the asynchronous part of the checkpoint took */
+	private final long asynchronousDurationMillis;
+
+	/** The number of bytes that were buffered during the checkpoint alignment phase */
+	private final long bytesBufferedInAlignment;
+
+	/** The duration (in nanoseconds) that the alignment phase of the task's checkpoint took */
+	private final long alignmentDurationNanos;
+
+	// ------------------------------------------------------------------------
+
+	public AcknowledgeCheckpoint(
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId) {
 		this(job, taskExecutionId, checkpointId, null, null);
 	}
 
 	public AcknowledgeCheckpoint(
-		JobID job,
-		ExecutionAttemptID taskExecutionId,
-		long checkpointId,
-		ChainedStateHandle<StreamStateHandle> state,
-		List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> state,
+			List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
+		this(job, taskExecutionId, checkpointId, state, keyGroupStateAndSizes, -1L, -1L, -1L, -1L);
+	}
+
+	public AcknowledgeCheckpoint(
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> state,
+			List<KeyGroupsStateHandle> keyGroupStateAndSizes,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
 
 		super(job, taskExecutionId, checkpointId);
+
+		// these may be null in cases where the operator has no state
 		this.stateHandle = state;
 		this.keyGroupsStateHandle = keyGroupStateAndSizes;
+
+		// these may be "-1", in case the values are unknown or not set
+		checkArgument(synchronousDurationMillis >= -1);
+		checkArgument(asynchronousDurationMillis >= -1);
+		checkArgument(bytesBufferedInAlignment >= -1);
+		checkArgument(alignmentDurationNanos >= -1);
+
+		this.synchronousDurationMillis = synchronousDurationMillis;
+		this.asynchronousDurationMillis = asynchronousDurationMillis;
+		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+		this.alignmentDurationNanos = alignmentDurationNanos;
 	}
 
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
 	public ChainedStateHandle<StreamStateHandle> getStateHandle() {
 		return stateHandle;
 	}
@@ -66,8 +115,29 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 		return keyGroupsStateHandle;
 	}
 
+	public long getSynchronousDurationMillis() {
+		return synchronousDurationMillis;
+	}
+
+	public long getAsynchronousDurationMillis() {
+		return asynchronousDurationMillis;
+	}
+
+	public long getBytesBufferedInAlignment() {
+		return bytesBufferedInAlignment;
+	}
+
+	public long getAlignmentDurationNanos() {
+		return alignmentDurationNanos;
+	}
+
 	// --------------------------------------------------------------------------------------------
-	
+
+	@Override
+	public int hashCode() {
+		return super.hashCode();
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -76,9 +146,10 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 		else if (o instanceof AcknowledgeCheckpoint) {
 			AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
 			return super.equals(o) &&
-					(this.stateHandle == null ? that.stateHandle == null : (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
-					(this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
-
+					(this.stateHandle == null ? that.stateHandle == null : 
+							(that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
+					(this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : 
+							(that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
 		}
 		else {
 			return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 56e5922..c317bed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -43,18 +43,21 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 
 	@Override
 	public void acknowledgeCheckpoint(
-		JobID jobID,
-		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-		List<KeyGroupsStateHandle> keyGroupStateHandles) {
+			JobID jobID,
+			ExecutionAttemptID executionAttemptID,
+			long checkpointID,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
 
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-			jobID,
-			executionAttemptID,
-			checkpointID,
-			chainedStateHandle,
-			keyGroupStateHandles);
+				jobID, executionAttemptID, checkpointID,
+				chainedStateHandle, keyGroupStateHandles,
+				synchronousDurationMillis, asynchronousDurationMillis,
+				bytesBufferedInAlignment, alignmentDurationNanos);
 
 		actorGateway.tell(message);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 9d5c4e1..b3f9827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -34,18 +34,35 @@ public interface CheckpointResponder {
 	/**
 	 * Acknowledges the given checkpoint.
 	 *
-	 * @param jobID Job ID of the running job
-	 * @param executionAttemptID Execution attempt ID of the running task
-	 * @param checkpointID Checkpoint ID of the checkpoint
-	 * @param chainedStateHandle Chained state handle
-	 * @param keyGroupStateHandles State handles for key groups
+	 * @param jobID
+	 *             Job ID of the running job
+	 * @param executionAttemptID
+	 *             Execution attempt ID of the running task
+	 * @param checkpointID
+	 *             Checkpoint ID of the checkpoint
+	 * @param chainedStateHandle
+	 *             Chained state handle
+	 * @param keyGroupStateHandles
+	 *             State handles for key groups
+	 * @param synchronousDurationMillis
+	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
+	 * @param asynchronousDurationMillis
+	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
+	 * @param bytesBufferedInAlignment
+	 *             The number of bytes that were buffered during the checkpoint alignment phase
+	 * @param alignmentDurationNanos
+	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took
 	 */
 	void acknowledgeCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
 		long checkpointID,
 		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-		List<KeyGroupsStateHandle> keyGroupStateHandles);
+		List<KeyGroupsStateHandle> keyGroupStateHandles,
+		long synchronousDurationMillis,
+		long asynchronousDurationMillis,
+		long bytesBufferedInAlignment,
+		long alignmentDurationNanos);
 
 	/**
 	 * Declines the given checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 3e4ba4d..23b6f82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -239,23 +239,34 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-		acknowledgeCheckpoint(checkpointId, null, null);
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
+
+		acknowledgeCheckpoint(checkpointId, null, null,
+				synchronousDurationMillis, asynchronousDurationMillis,
+				bytesBufferedInAlignment, alignmentDurationNanos);
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
 			long checkpointId,
 			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
 
 
 		checkpointResponder.acknowledgeCheckpoint(
-			jobId,
-			executionId,
-			checkpointId,
-			chainedStateHandle,
-			keyGroupStateHandles);
+				jobId, executionId, checkpointId,
+				chainedStateHandle, keyGroupStateHandles,
+				synchronousDurationMillis, asynchronousDurationMillis,
+				bytesBufferedInAlignment, alignmentDurationNanos);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 6b80c3d..ef8e3bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -453,12 +453,15 @@ public class JobManagerHARecoveryTest {
 			try {
 				ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
 						InstantiationUtil.serializeObject(checkpointId));
+
 				RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
 				ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+
 				getEnvironment().acknowledgeCheckpoint(
 						checkpointId,
 						chainedStateHandle,
-						Collections.<KeyGroupsStateHandle>emptyList());
+						Collections.<KeyGroupsStateHandle>emptyList(),
+						0L, 0L, 0L, 0L);
 				return true;
 			} catch (Exception ex) {
 				throw new RuntimeException(ex);
@@ -466,6 +469,12 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+			throw new UnsupportedOperationException("should not be called!");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
 				completedCheckpointsLatch.countDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 4654507..a857d1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -153,14 +153,18 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index e7bf6e1..75e88eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -314,15 +314,18 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-		throw new UnsupportedOperationException();
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
-		throw new UnsupportedOperationException();
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index a5f4019..ed30fd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -228,6 +228,12 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+			throw new UnsupportedOperationException("Should not be called");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (checkpointId != lastCheckpointId && this.error == null) {
 				this.error = new Exception("calls out of order");

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..d60c999 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.slf4j.Logger;
@@ -37,22 +37,22 @@ import org.slf4j.LoggerFactory;
  * 
  * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
- * the blocks are released.</p>
+ * the blocks are released.
  */
 @Internal
 public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-	
+
 	/** The gate that the buffer draws its input from */
 	private final InputGate inputGate;
 
 	/** Flags that indicate whether a channel is currently blocked/buffered */
 	private final boolean[] blockedChannels;
-	
+
 	/** The total number of channels that this buffer handles data from */
 	private final int totalNumberOfInputChannels;
-	
+
 	/** To utility to write blocked data to a file channel */
 	private final BufferSpiller bufferSpiller;
 
@@ -65,17 +65,23 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
 	/** Handler that receives the checkpoint notifications */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask toNotifyOnCheckpoint;
 
 	/** The ID of the checkpoint for which we expect barriers */
 	private long currentCheckpointId = -1L;
 
 	/** The number of received barriers (= number of blocked/buffered channels) */
 	private int numBarriersReceived;
-	
+
 	/** The number of already closed channels */
 	private int numClosedChannels;
-	
+
+	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+	private long startOfAlignmentTimestamp;
+
+	/** The time (in nanoseconds) that the latest alignment took */
+	private long latestAlignmentDurationNanos;
+
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
@@ -100,7 +106,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
 			BufferOrEvent next;
@@ -114,7 +120,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					return getNextNonBlocked();
 				}
 			}
-			
+
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
@@ -139,12 +145,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				}
 			}
 			else if (!endOfStream) {
-				// end of stream. we feed the data that is still buffered
+				// end of input stream. stream continues with the buffered data
 				endOfStream = true;
 				releaseBlocks();
 				return getNextNonBlocked();
 			}
 			else {
+				// final end of both input and buffered data
 				return null;
 			}
 		}
@@ -158,7 +165,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 	
-	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
 		if (numBarriersReceived > 0) {
@@ -175,6 +182,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				releaseBlocks();
 				currentCheckpointId = barrierId;
 				onBarrier(channelIndex);
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Starting stream alignment for checkpoint {}", barrierId);
+				}
+				startOfAlignmentTimestamp = System.nanoTime();
 			}
 			else {
 				// ignore trailing barrier from aborted checkpoint
@@ -186,6 +198,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			// first barrier of a new checkpoint
 			currentCheckpointId = barrierId;
 			onBarrier(channelIndex);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Starting stream alignment for checkpoint {}", barrierId);
+			}
+			startOfAlignmentTimestamp = System.nanoTime();
 		}
 		else {
 			// trailing barrier from previous (skipped) checkpoint
@@ -199,21 +216,23 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 						receivedBarrier.getId(), receivedBarrier.getTimestamp());
 			}
 
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
-			}
-			
 			releaseBlocks();
+
+			if (toNotifyOnCheckpoint != null) {
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+						receivedBarrier.getId(), receivedBarrier.getTimestamp(),
+						bufferSpiller.getBytesWritten(), latestAlignmentDurationNanos);
+			}
 		}
 	}
 	
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
 		}
 	}
 	
@@ -267,9 +286,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * is the next to be consumed.
 	 */
 	private void releaseBlocks() throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Releasing blocks");
-		}
+		LOG.debug("End of stream alignment, feeding buffered data back");
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
@@ -295,10 +312,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentBuffered = bufferedNow;
 			}
 		}
+
+		final long now = System.nanoTime();
+		if (startOfAlignmentTimestamp > 0) {
+			latestAlignmentDurationNanos = now - startOfAlignmentTimestamp;
+			startOfAlignmentTimestamp = 0;
+		}
 	}
 
 	// ------------------------------------------------------------------------
-	// For Testing
+	//  Properties
 	// ------------------------------------------------------------------------
 
 	/**
@@ -309,7 +332,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	public long getCurrentCheckpointId() {
 		return this.currentCheckpointId;
 	}
-	
+
+	@Override
+	public long getAlignmentDurationNanos() {
+		long start = this.startOfAlignmentTimestamp;
+		if (start <= 0) {
+			return latestAlignmentDurationNanos;
+		} else {
+			return System.nanoTime() - start;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Utilities 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..1db5845 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
-import java.io.IOException;
 import java.util.ArrayDeque;
 
 /**
@@ -57,11 +56,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 	
 	/** The listener to be notified on complete checkpoints */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask toNotifyOnCheckpoint;
 	
 	/** The highest checkpoint ID encountered so far */
 	private long latestPendingCheckpointID = -1;
-	
+
+	// ------------------------------------------------------------------------
 	
 	public BarrierTracker(InputGate inputGate) {
 		this.inputGate = inputGate;
@@ -70,7 +70,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			BufferOrEvent next = inputGate.getNextBufferOrEvent();
 			if (next == null) {
@@ -86,12 +86,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
 		}
 	}
 
@@ -105,22 +105,29 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		return pendingCheckpoints.isEmpty();
 	}
 
-	private void processBarrier(CheckpointBarrier receivedBarrier) {
+	@Override
+	public long getAlignmentDurationNanos() {
+		// this one does not do alignment at all
+		return 0L;
+	}
+
+	private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
+			if (toNotifyOnCheckpoint != null) {
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+						receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
 			}
 			return;
 		}
-		
+
 		// general path for multiple input channels
 		final long barrierId = receivedBarrier.getId();
 
 		// find the checkpoint barrier in the queue of bending barriers
 		CheckpointBarrierCount cbc = null;
 		int pos = 0;
-		
+
 		for (CheckpointBarrierCount next : pendingCheckpoints) {
 			if (next.checkpointId == barrierId) {
 				cbc = next;
@@ -128,7 +135,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 			}
 			pos++;
 		}
-		
+
 		if (cbc != null) {
 			// add one to the count to that barrier and check for completion
 			int numBarriersNew = cbc.incrementBarrierCount();
@@ -141,8 +148,9 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				}
 				
 				// notify the listener
-				if (checkpointHandler != null) {
-					checkpointHandler.onEvent(receivedBarrier);
+				if (toNotifyOnCheckpoint != null) {
+					toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+							receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
 				}
 			}
 		}
@@ -182,7 +190,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		public int incrementBarrierCount() {
 			return ++barrierCount;
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..5a8a4cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -85,8 +85,8 @@ public class BufferSpiller {
 	/** A counter, to created numbered spill files */
 	private int fileCounter;
 	
-	/** A flag to check whether the spiller has written since the last roll over */
-	private boolean hasWritten;
+	/** The number of bytes written since the last roll over */
+	private long bytesWritten;
 	
 	/**
 	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
 	 * @throws IOException Thrown, if the buffer of event could not be spilled.
 	 */
 	public void add(BufferOrEvent boe) throws IOException {
-		hasWritten = true;
 		try {
 			ByteBuffer contents;
 			if (boe.isBuffer()) {
@@ -133,6 +132,7 @@ public class BufferSpiller {
 			}
 			else {
 				contents = EventSerializer.toSerializedEvent(boe.getEvent());
+				
 			}
 			
 			headBuffer.clear();
@@ -140,7 +140,9 @@ public class BufferSpiller {
 			headBuffer.putInt(contents.remaining());
 			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
 			headBuffer.flip();
-			
+
+			bytesWritten += (headBuffer.remaining() + contents.remaining());
+
 			sources[1] = contents;
 			currentChannel.write(sources);
 		}
@@ -186,10 +188,10 @@ public class BufferSpiller {
 	}
 	
 	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
-		if (!hasWritten) {
+		if (bytesWritten == 0) {
 			return null;
 		}
-		
+
 		ByteBuffer buf;
 		if (newBuffer) {
 			buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
@@ -197,16 +199,16 @@ public class BufferSpiller {
 		} else {
 			buf = readBuffer;
 		}
-		
+
 		// create a reader for the spilled data
 		currentChannel.position(0L);
 		SpilledBufferOrEventSequence seq = 
 				new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
-		
+
 		// create ourselves a new spill file
 		createSpillingChannel();
-		
-		hasWritten = false;
+
+		bytesWritten = 0L;
 		return seq;
 	}
 
@@ -225,6 +227,14 @@ public class BufferSpiller {
 		}
 	}
 
+	/**
+	 * Gets the number of bytes written in the current spill file.
+	 * @return the number of bytes written in the current spill file
+	 */
+	public long getBytesWritten() {
+		return bytesWritten;
+	}
+
 	// ------------------------------------------------------------------------
 	//  For testing
 	// ------------------------------------------------------------------------
@@ -255,16 +265,16 @@ public class BufferSpiller {
 	 * method {@link #getNext()}.
 	 */
 	public static class SpilledBufferOrEventSequence {
-		
+
 		/** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
 		private static final int HEADER_LENGTH = 9;
 
 		/** The file containing the data */
 		private final File file;
-		
+
 		/** The file channel to draw the data from */
 		private final FileChannel fileChannel;
-		
+
 		/** The byte buffer for bulk reading */
 		private final ByteBuffer buffer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..56859fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import java.io.IOException;
 
@@ -39,18 +38,22 @@ public interface CheckpointBarrierHandler {
 	 * has been determined to be finished.
 	 * 
 	 * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
-	 * @throws java.io.IOException Thrown if the network or local disk I/O fails.
-	 * @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
-	 *                                        waiting for the next BufferOrEvent to become available.
+	 * 
+	 * @throws IOException Thrown if the network or local disk I/O fails.
+	 * 
+	 * @throws InterruptedException Thrown if the thread is interrupted while blocking during
+	 *                              waiting for the next BufferOrEvent to become available.
+	 * @throws Exception Thrown in case that a checkpoint fails that is started as the result of receiving 
+	 *                   the last checkpoint barrier 
 	 */
-	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+	BufferOrEvent getNextNonBlocked() throws Exception;
 
 	/**
-	 * Registers the given event handler to be notified on successful checkpoints.
+	 * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
 	 * 
-	 * @param checkpointHandler The handler to register.
+	 * @param task The task to notify
 	 */
-	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+	void registerCheckpointEventHandler(StatefulTask task);
 
 	/**
 	 * Cleans up all internally held resources.
@@ -64,4 +67,13 @@ public interface CheckpointBarrierHandler {
 	 * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
 	 */
 	boolean isEmpty();
+
+	/**
+	 * Gets the time that the latest alignment took, in nanoseconds.
+	 * If there is currently an alignment in progress, it will return the time spent in the
+	 * current alignment so far.
+	 * 
+	 * @return The duration in nanoseconds
+	 */
+	long getAlignmentDurationNanos();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..85e9297 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,10 +84,10 @@ public class StreamInputProcessor<IN> {
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
-								EventListener<CheckpointBarrier> checkpointListener,
-								CheckpointingMode checkpointMode,
-								IOManager ioManager,
-								boolean enableWatermarkMultiplexing) throws IOException {
+			StatefulTask checkpointedTask,
+			CheckpointingMode checkpointMode,
+			IOManager ioManager,
+			boolean enableWatermarkMultiplexing) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -102,8 +101,8 @@ public class StreamInputProcessor<IN> {
 			throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
 		}
 		
-		if (checkpointListener != null) {
-			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		if (checkpointedTask != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
 		
 		if (enableWatermarkMultiplexing) {
@@ -215,7 +214,7 @@ public class StreamInputProcessor<IN> {
 	 * Sets the metric group for this StreamInputProcessor.
 	 * 
 	 * @param metrics metric group
-     */
+	 */
 	public void setMetricGroup(IOMetricGroup metrics) {
 		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
 			@Override
@@ -223,6 +222,13 @@ public class StreamInputProcessor<IN> {
 				return lastEmittedWatermark;
 			}
 		});
+
+		metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return barrierHandler.getAlignmentDurationNanos();
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..70ce783 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
-			EventListener<CheckpointBarrier> checkpointListener,
+			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
 			boolean enableWatermarkMultiplexing) throws IOException {
@@ -112,8 +111,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
 		}
 		
-		if (checkpointListener != null) {
-			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		if (checkpointedTask != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
 		
 		if (enableWatermarkMultiplexing) {
@@ -294,6 +293,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 				return Math.min(lastEmittedWatermark1, lastEmittedWatermark2);
 			}
 		});
+
+		metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return barrierHandler.getAlignmentDurationNanos();
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d6d2fb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
 			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
-					getCheckpointBarrierListener(), 
+					this, 
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
 					isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff074b7..7976f01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -589,7 +587,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	@Override
 	public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
 		try {
-			return performCheckpoint(checkpointId, timestamp);
+			return performCheckpoint(checkpointId, timestamp, 0L, 0L);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -601,11 +599,31 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+	@Override
+	public void triggerCheckpointOnBarrier(
+			long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+
+		try {
+			performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+		}
+		catch (CancelTaskException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new Exception("Error while performing a checkpoint", e);
+		}
+	}
+
+	private boolean performCheckpoint(
+			long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+
 		synchronized (lock) {
 			if (isRunning) {
 
+				final long startOfSyncPart = System.nanoTime();
+
 				// Since both state checkpointing and downstream barrier emission occurs in this
 				// lock scope, they are an atomic operation regardless of the order in which they occur.
 				// Given this, we immediately emit the checkpoint barriers, so the downstream operators
@@ -654,13 +672,20 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 				LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
 
+				final long endOfSyncPart = System.nanoTime();
+				final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+
 				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
 						"checkpoint-" + checkpointId + "-" + timestamp,
 						this,
 						cancelables,
 						chainedStateHandles,
 						keyGroupsStateHandleFuture,
-						checkpointId);
+						checkpointId,
+						bytesBufferedAlignment,
+						alignmentDurationNanos,
+						syncDurationMillis,
+						endOfSyncPart);
 
 				synchronized (cancelables) {
 					cancelables.add(asyncCheckpointRunnable);
@@ -851,29 +876,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return getName();
 	}
 
-	final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return new EventListener<CheckpointBarrier>() {
-			@Override
-			public void onEvent(CheckpointBarrier barrier) {
-				try {
-					performCheckpoint(barrier.getId(), barrier.getTimestamp());
-				}
-				catch (CancelTaskException e) {
-					throw e;
-				}
-				catch (Exception e) {
-					throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
-				}
-			}
-		};
-	}
-
 	// ------------------------------------------------------------------------
 	
 	private static class AsyncCheckpointRunnable implements Runnable, Closeable {
@@ -890,13 +898,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 		private final String name;
 
+		private final long bytesBufferedInAlignment;
+
+		private final long alignmentDurationNanos;
+
+		private final long syncDurationMillies;
+
+		private final long asyncStartNanos;
+
 		AsyncCheckpointRunnable(
 				String name,
 				StreamTask<?, ?> owner,
 				Set<Closeable> cancelables,
 				ChainedStateHandle<StreamStateHandle> chainedStateHandles,
 				RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
-				long checkpointId) {
+				long checkpointId,
+				long bytesBufferedInAlignment,
+				long alignmentDurationNanos,
+				long syncDurationMillies,
+				long asyncStartNanos) {
 
 			this.name = name;
 			this.owner = owner;
@@ -904,6 +924,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			this.chainedStateHandles = chainedStateHandles;
 			this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
 			this.checkpointId = checkpointId;
+			this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+			this.alignmentDurationNanos = alignmentDurationNanos;
+			this.syncDurationMillies = syncDurationMillies;
+			this.asyncStartNanos = asyncStartNanos;
 		}
 
 		@Override
@@ -925,19 +949,26 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					}
 				}
 
+				final long asyncEndNanos = System.nanoTime();
+				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
+
 				if (chainedStateHandles.isEmpty() && keyedStates.isEmpty()) {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+							syncDurationMillies, asyncDurationMillis,
+							bytesBufferedInAlignment, alignmentDurationNanos);
 				} else  {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId, chainedStateHandles, keyedStates);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+							chainedStateHandles, keyedStates,
+							syncDurationMillies, asyncDurationMillis,
+							bytesBufferedInAlignment, alignmentDurationNanos);
 				}
 
-				if(LOG.isDebugEnabled()) {
+				if (LOG.isDebugEnabled()) {
 					LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}. Returning handles on " +
 							"keyed states {}.", checkpointId, name, keyedStates);
 				}
 			}
 			catch (Exception e) {
-
 				// registers the exception and tries to fail the whole task
 				AsynchronousException asyncException = new AsynchronousException(e);
 				owner.registerAsyncException(asyncException);

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 	
 		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
-				getCheckpointBarrierListener(),
+				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
 				isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..b549ef8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -26,7 +25,10 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.AfterClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -86,7 +89,9 @@ public class BarrierBufferTest {
 			for (BufferOrEvent boe : sequence) {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
-			
+
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 			
@@ -120,6 +125,8 @@ public class BarrierBufferTest {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
 
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 
@@ -222,13 +229,15 @@ public class BarrierBufferTest {
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
 			handler.setNextExpectedCheckpointId(1L);
-			
+
 			// pre checkpoint 1
 			check(sequence[0], buffer.getNextNonBlocked());
 			check(sequence[1], buffer.getNextNonBlocked());
 			check(sequence[2], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
 
+			long startTs = System.nanoTime();
+
 			// blocking while aligning for checkpoint 1
 			check(sequence[7], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +245,8 @@ public class BarrierBufferTest {
 			// checkpoint 1 done, returning buffered data
 			check(sequence[5], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			// pre checkpoint 2
@@ -245,10 +256,13 @@ public class BarrierBufferTest {
 			check(sequence[12], buffer.getNextNonBlocked());
 			check(sequence[13], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			
+
 			// checkpoint 2 barriers come together
+			startTs = System.nanoTime();
 			check(sequence[17], buffer.getNextNonBlocked());
 			assertEquals(3L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[18], buffer.getNextNonBlocked());
 
 			// checkpoint 3 starts, data buffered
@@ -257,7 +271,7 @@ public class BarrierBufferTest {
 			check(sequence[21], buffer.getNextNonBlocked());
 
 			// checkpoint 4 happens without extra data
-			
+
 			// pre checkpoint 5
 			check(sequence[27], buffer.getNextNonBlocked());
 			assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +315,7 @@ public class BarrierBufferTest {
 			BufferOrEvent[] sequence = {
 					createBuffer(0), createBuffer(1), createBuffer(2),
 					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-					
+
 					createBuffer(2), createBuffer(1), createBuffer(0),
 					createBarrier(2, 1),
 					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +341,14 @@ public class BarrierBufferTest {
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
 			check(sequence[7], buffer.getNextNonBlocked());
 			check(sequence[8], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 2 alignment
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[14], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[19], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
 
 			// end of stream: remaining buffered contents
 			check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +359,7 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			
+
 			buffer.cleanup();
 
 			checkNoTempFilesRemain();
@@ -389,7 +405,7 @@ public class BarrierBufferTest {
 					createBarrier(3, 2),
 					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
 					createBarrier(6, 1),
-					
+
 					// complete checkpoint 4, checkpoint 5 remains not fully triggered
 					createBarrier(4, 2),
 					createBuffer(2),
@@ -419,10 +435,13 @@ public class BarrierBufferTest {
 
 			// alignment of checkpoint 2 - buffering also some barriers for
 			// checkpoints 3 and 4
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[20], buffer.getNextNonBlocked());
 			check(sequence[23], buffer.getNextNonBlocked());
-			
+
+			validateAlignmentTime(startTs, buffer);
+
 			// checkpoint 2 completed
 			check(sequence[12], buffer.getNextNonBlocked());
 			check(sequence[25], buffer.getNextNonBlocked());
@@ -613,17 +632,21 @@ public class BarrierBufferTest {
 			check(sequence[19], buffer.getNextNonBlocked());
 			check(sequence[21], buffer.getNextNonBlocked());
 
+			long startTs = System.nanoTime();
+
 			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(4L, buffer.getCurrentCheckpointId());
 			check(sequence[16], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
-			
+
 			// align checkpoint 4 remainder
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-			
+
+			validateAlignmentTime(startTs, buffer);
+
 			// checkpoint 4 aborted (due to end of partition)
 			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
@@ -926,12 +949,17 @@ public class BarrierBufferTest {
 			}
 		}
 	}
-	
+
+	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+		final long elapsed = System.nanoTime() - startTimestamp;
+		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
-	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+	private static class ValidatingCheckpointHandler implements StatefulTask {
 		
 		private long nextExpectedCheckpointId = -1L;
 
@@ -944,11 +972,33 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			assertNotNull(barrier);
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+		public void setInitialState(
+				ChainedStateHandle<StreamStateHandle> chainedState,
+				List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp,
+				long bytesAligned, long alignmentTimeNanos) throws Exception {
+
+			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
+			assertTrue(timestamp > 0);
+			assertTrue(bytesAligned >= 0);
+			assertTrue(alignmentTimeNanos >= 0);
+
 			nextExpectedCheckpointId++;
 		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..314dcc4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -22,12 +22,16 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.*;
 
@@ -346,22 +350,42 @@ public class BarrierTrackerTest {
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 	
-	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+	private static class CheckpointSequenceValidator implements StatefulTask {
 
 		private final long[] checkpointIDs;
-		
+
 		private int i = 0;
 
 		private CheckpointSequenceValidator(long... checkpointIDs) {
 			this.checkpointIDs = checkpointIDs;
 		}
-		
+
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
+		public void setInitialState(
+				ChainedStateHandle<StreamStateHandle> chainedState,
+				List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
+
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp,
+				long bytesAligned, long alignmentTimeNanos) throws Exception {
+
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertNotNull(barrier);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+			assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
+			assertTrue(timestamp > 0);
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 7ef0080..88fb383 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -478,27 +478,32 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		List<KeyGroupsStateHandle> getKeyGroupStates() {
 			List<KeyGroupsStateHandle> result = new ArrayList<>();
-			for (int i = 0; i < keyGroupStates.size(); i++) {
-				if (keyGroupStates.get(i) != null) {
-					result.add(keyGroupStates.get(i));
+			for (KeyGroupsStateHandle keyGroupState : keyGroupStates) {
+				if (keyGroupState != null) {
+					result.add(keyGroupState);
 				}
 			}
 			return result;
 		}
 
-		AcknowledgeStreamMockEnvironment(Configuration jobConfig, Configuration taskConfig,
-		                                 ExecutionConfig executionConfig, long memorySize,
-		                                 MockInputSplitProvider inputSplitProvider, int bufferSize) {
+		AcknowledgeStreamMockEnvironment(
+				Configuration jobConfig, Configuration taskConfig,
+				ExecutionConfig executionConfig, long memorySize,
+				MockInputSplitProvider inputSplitProvider, int bufferSize) {
 			super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize);
 		}
 
 
 		@Override
-		public void acknowledgeCheckpoint(long checkpointId, ChainedStateHandle<StreamStateHandle> state,
-		                                  List<KeyGroupsStateHandle> keyGroupStates) {
+		public void acknowledgeCheckpoint(
+				long checkpointId,
+				ChainedStateHandle<StreamStateHandle> state, List<KeyGroupsStateHandle> keyGroupStates,
+				long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+
 			this.checkpointId = checkpointId;
 			this.state = state;
 			this.keyGroupStates = keyGroupStates;
+
 			checkpointLatch.trigger();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 0901b32..2036f69 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -308,14 +308,18 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
-
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override


[3/3] flink git commit: [FLINK-4696] [core] Limit number of Akka threads in local minicluster setups

Posted by se...@apache.org.
[FLINK-4696] [core] Limit number of Akka threads in local minicluster setups

Since Flink uses a rather small number of actors, not too many actor dispatcher threads are needed.
To prevent mini cluster setups on multi-core CPUs (32 or 64 cores) to spawn too many threads,
this limits the number of dispatcher threads for mini clusters.

For proper Flink deployments, the threads are not limited by this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ea9284d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ea9284d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ea9284d

Branch: refs/heads/master
Commit: 6ea9284d29ec79576f073441a5de681019720ab0
Parents: e5d62da
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 27 14:21:20 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 14:58:41 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/akka/AkkaUtils.scala    | 19 +++++++++++++++++++
 .../runtime/minicluster/FlinkMiniCluster.scala   | 10 ++++++----
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9284d/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 7aa75c0..bd3af33 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -189,6 +189,25 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
+  def testDispatcherConfig: Config = {
+    val config =
+      s"""
+         |akka {
+         |  actor {
+         |    default-dispatcher {
+         |      fork-join-executor {
+         |        parallelism-factor = 1.0
+         |        parallelism-min = 1
+         |        parallelism-max = 4
+         |      }
+         |    }
+         |  }
+         |}
+      """.stripMargin
+
+    ConfigFactory.parseString(config)
+  }
+
   /**
    * Creates a Akka config for a remote actor system listening on port on the network interface
    * identified by hostname.

http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9284d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0178bd3..a263f66 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
-import scala.concurrent.forkjoin.ForkJoinPool
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -266,17 +265,20 @@ abstract class FlinkMiniCluster(
 
   def startResourceManagerActorSystem(index: Int): ActorSystem = {
     val config = getResourceManagerAkkaConfig(index)
-    AkkaUtils.createActorSystem(config)
+    val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+    AkkaUtils.createActorSystem(testConfig)
   }
 
   def startJobManagerActorSystem(index: Int): ActorSystem = {
     val config = getJobManagerAkkaConfig(index)
-    AkkaUtils.createActorSystem(config)
+    val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+    AkkaUtils.createActorSystem(testConfig)
   }
 
   def startTaskManagerActorSystem(index: Int): ActorSystem = {
     val config = getTaskManagerAkkaConfig(index)
-    AkkaUtils.createActorSystem(config)
+    val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+    AkkaUtils.createActorSystem(testConfig)
   }
 
   def startJobClientActorSystem(jobID: JobID): ActorSystem = {