You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:43 UTC

[4/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData

[FLINK-4730] Introducing CheckpointMetaData

This closes #2583.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dac7ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dac7ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dac7ad0

Branch: refs/heads/master
Commit: 0dac7ad00a25c4b038b57009f41f5512f8ce73c3
Parents: 241d47b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:19:37 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:12 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  15 +--
 .../runtime/checkpoint/CheckpointMetaData.java  | 113 ++++++++++++++++
 .../runtime/checkpoint/CheckpointMetrics.java   | 131 +++++++++++++++++++
 .../flink/runtime/execution/Environment.java    |  41 ++----
 .../runtime/jobgraph/tasks/StatefulTask.java    |  16 +--
 .../checkpoint/AcknowledgeCheckpoint.java       |  56 +++-----
 .../ActorGatewayCheckpointResponder.java        |  22 ++--
 .../taskmanager/CheckpointResponder.java        |  30 ++---
 .../runtime/taskmanager/RuntimeEnvironment.java |  28 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 118 ++++++++++-------
 .../checkpoint/CheckpointStateRestoreTest.java  |  24 ++--
 .../jobmanager/JobManagerHARecoveryTest.java    |  13 +-
 .../messages/CheckpointMessagesTest.java        |   5 +-
 .../operators/testutils/DummyEnvironment.java   |  17 +--
 .../operators/testutils/MockEnvironment.java    |  15 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  16 +--
 .../kafka/FlinkKafkaConsumerBase.java           |   2 +
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  13 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  20 +--
 .../streaming/runtime/io/BarrierTracker.java    |  22 +++-
 .../streaming/runtime/tasks/StreamTask.java     |  71 +++++-----
 .../streaming/runtime/io/BarrierBufferTest.java |  17 ++-
 .../runtime/io/BarrierTrackerTest.java          |  11 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  12 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |   6 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  19 +--
 27 files changed, 515 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 2ebd84a..8f58075 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -134,14 +135,10 @@ public class RocksDBAsyncSnapshotTest {
 
 			@Override
 			public void acknowledgeCheckpoint(
-					long checkpointId,
-					CheckpointStateHandles checkpointStateHandles,
-					long synchronousDurationMillis, long asynchronousDurationMillis,
-					long bytesBufferedInAlignment, long alignmentDurationNanos) {
+					CheckpointMetaData checkpointMetaData,
+					CheckpointStateHandles checkpointStateHandles) {
 
-				super.acknowledgeCheckpoint(checkpointId, checkpointStateHandles,
-						synchronousDurationMillis, asynchronousDurationMillis,
-						bytesBufferedInAlignment, alignmentDurationNanos);
+				super.acknowledgeCheckpoint(checkpointMetaData);
 
 				// block on the latch, to verify that triggerCheckpoint returns below,
 				// even though the async checkpoint would not finish
@@ -171,7 +168,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(42, 17);
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
 
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 
@@ -250,7 +247,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(42, 17);
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
 		task.cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
new file mode 100644
index 0000000..6f117f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Encapsulates all the meta data for a checkpoint.
+ */
+public class CheckpointMetaData implements Serializable {
+
+	private static final long serialVersionUID = -2387652345781312442L;
+
+	/** The ID of the checkpoint */
+	private final long checkpointId;
+
+	/** The timestamp of the checkpoint */
+	private final long timestamp;
+
+	private final CheckpointMetrics metrics;
+
+	public CheckpointMetaData(long checkpointId, long timestamp) {
+		this.checkpointId = checkpointId;
+		this.timestamp = timestamp;
+		this.metrics = new CheckpointMetrics();
+	}
+
+	public CheckpointMetaData(
+			long checkpointId,
+			long timestamp,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
+		this.checkpointId = checkpointId;
+		this.timestamp = timestamp;
+		this.metrics = new CheckpointMetrics(
+				bytesBufferedInAlignment,
+				alignmentDurationNanos,
+				synchronousDurationMillis,
+				asynchronousDurationMillis);
+	}
+
+	public CheckpointMetrics getMetrics() {
+		return metrics;
+	}
+
+	public CheckpointMetaData setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+		Preconditions.checkArgument(bytesBufferedInAlignment >= 0);
+		this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment);
+		return this;
+	}
+
+	public CheckpointMetaData setAlignmentDurationNanos(long alignmentDurationNanos) {
+		Preconditions.checkArgument(alignmentDurationNanos >= 0);
+		this.metrics.setAlignmentDurationNanos(alignmentDurationNanos);
+		return this;
+	}
+
+	public CheckpointMetaData setSyncDurationMillis(long syncDurationMillis) {
+		Preconditions.checkArgument(syncDurationMillis >= 0);
+		this.metrics.setSyncDurationMillis(syncDurationMillis);
+		return this;
+	}
+
+	public CheckpointMetaData setAsyncDurationMillis(long asyncDurationMillis) {
+		Preconditions.checkArgument(asyncDurationMillis >= 0);
+		this.metrics.setAsyncDurationMillis(asyncDurationMillis);
+		return this;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public long getBytesBufferedInAlignment() {
+		return metrics.getBytesBufferedInAlignment();
+	}
+
+	public long getAlignmentDurationNanos() {
+		return metrics.getAlignmentDurationNanos();
+	}
+
+	public long getSyncDurationMillis() {
+		return metrics.getSyncDurationMillis();
+	}
+
+	public long getAsyncDurationMillis() {
+		return metrics.getAsyncDurationMillis();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
new file mode 100644
index 0000000..4155290
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+public class CheckpointMetrics implements Serializable {
+
+	/**
+	 * The number of bytes that were buffered during the checkpoint alignment phase
+	 */
+	private long bytesBufferedInAlignment;
+
+	/**
+	 * The duration (in nanoseconds) that the stream alignment for the checkpoint took
+	 */
+	private long alignmentDurationNanos;
+
+	/* The duration (in milliseconds) of the synchronous part of the operator checkpoint */
+	private long syncDurationMillis;
+
+	/* The duration (in milliseconds) of the asynchronous part of the operator checkpoint  */
+	private long asyncDurationMillis;
+
+	public CheckpointMetrics() {
+		this(-1L, -1L, -1L, -1L);
+	}
+
+	public CheckpointMetrics(
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos,
+			long syncDurationMillis,
+			long asyncDurationMillis) {
+
+		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+		this.alignmentDurationNanos = alignmentDurationNanos;
+		this.syncDurationMillis = syncDurationMillis;
+		this.asyncDurationMillis = asyncDurationMillis;
+	}
+
+	public long getBytesBufferedInAlignment() {
+		return bytesBufferedInAlignment;
+	}
+
+	public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+	}
+
+	public long getAlignmentDurationNanos() {
+		return alignmentDurationNanos;
+	}
+
+	public void setAlignmentDurationNanos(long alignmentDurationNanos) {
+		this.alignmentDurationNanos = alignmentDurationNanos;
+	}
+
+	public long getSyncDurationMillis() {
+		return syncDurationMillis;
+	}
+
+	public void setSyncDurationMillis(long syncDurationMillis) {
+		this.syncDurationMillis = syncDurationMillis;
+	}
+
+	public long getAsyncDurationMillis() {
+		return asyncDurationMillis;
+	}
+
+	public void setAsyncDurationMillis(long asyncDurationMillis) {
+		this.asyncDurationMillis = asyncDurationMillis;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		CheckpointMetrics that = (CheckpointMetrics) o;
+
+		if (bytesBufferedInAlignment != that.bytesBufferedInAlignment) {
+			return false;
+		}
+		if (alignmentDurationNanos != that.alignmentDurationNanos) {
+			return false;
+		}
+		if (syncDurationMillis != that.syncDurationMillis) {
+			return false;
+		}
+		return asyncDurationMillis == that.asyncDurationMillis;
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (bytesBufferedInAlignment ^ (bytesBufferedInAlignment >>> 32));
+		result = 31 * result + (int) (alignmentDurationNanos ^ (alignmentDurationNanos >>> 32));
+		result = 31 * result + (int) (syncDurationMillis ^ (syncDurationMillis >>> 32));
+		result = 31 * result + (int) (asyncDurationMillis ^ (asyncDurationMillis >>> 32));
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "CheckpointMetrics{" +
+				"bytesBufferedInAlignment=" + bytesBufferedInAlignment +
+				", alignmentDurationNanos=" + alignmentDurationNanos +
+				", syncDurationMillis=" + syncDurationMillis +
+				", asyncDurationMillis=" + asyncDurationMillis +
+				'}';
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index f6cde95..cbbeec7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -19,20 +19,21 @@
 package org.apache.flink.runtime.execution;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KvState;
@@ -161,47 +162,21 @@ public interface Environment {
 	 * to for the checkpoint with the give checkpoint-ID. This method does not include
 	 * any state in the checkpoint.
 	 * 
-	 * @param checkpointId
-	 *             The ID of the checkpoint.
-	 * @param synchronousDurationMillis
-	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
-	 * @param asynchronousDurationMillis
-	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
-	 * @param bytesBufferedInAlignment
-	 *             The number of bytes that were buffered during the checkpoint alignment phase
-	 * @param alignmentDurationNanos
-	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
+	 * @param checkpointMetaData the meta data for this checkpoint
 	 */
-	void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos);
+	void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData);
 
 	/**
 	 * Confirms that the invokable has successfully completed all required steps for
 	 * the checkpoint with the give checkpoint-ID. This method does include
 	 * the given state in the checkpoint.
 	 *
-	 * @param checkpointId The ID of the checkpoint.
 	 * @param checkpointStateHandles All state handles for the checkpointed state
-	 * @param synchronousDurationMillis
-	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
-	 * @param asynchronousDurationMillis
-	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
-	 * @param bytesBufferedInAlignment
-	 *             The number of bytes that were buffered during the checkpoint alignment phase
-	 * @param alignmentDurationNanos
-	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
+	 * @param checkpointMetaData the meta data for this checkpoint
 	 */
 	void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos);
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles);
 
 	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 55e3e09..e1d15e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -52,29 +53,24 @@ public interface StatefulTask {
 	 * 
 	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
 	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
-	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(long, long, long, long)}
+	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData)}
 	 * method.
 	 *
-	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
-	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 * @param checkpointMetaData Meta data for about this checkpoint
 	 *
 	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
-	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
 
 	/**
 	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
 	 * barriers on all input streams.
 	 * 
-	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
-	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
-	 * @param bytesAligned The number of bytes that were buffered during the alignment of the streams.
-	 * @param alignmentTimeNanos The time that the stream alignment took, in nanoseconds.   
+	 * @param checkpointMetaData Meta data for about this checkpoint
 	 * 
 	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
 	 */
-	void triggerCheckpointOnBarrier(
-			long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception;
+	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index e95e7b3..ac14d3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 
@@ -39,59 +40,32 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 	private final CheckpointStateHandles checkpointStateHandles;
 
-	/** The duration (in milliseconds) that the synchronous part of the checkpoint took */
-	private final long synchronousDurationMillis;
-
-	/** The duration (in milliseconds) that the asynchronous part of the checkpoint took */
-	private final long asynchronousDurationMillis;
-
-	/** The number of bytes that were buffered during the checkpoint alignment phase */
-	private final long bytesBufferedInAlignment;
-
-	/** The duration (in nanoseconds) that the alignment phase of the task's checkpoint took */
-	private final long alignmentDurationNanos;
+	private final CheckpointMetaData checkpointMetaData;
 
 	// ------------------------------------------------------------------------
 
 	public AcknowledgeCheckpoint(
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
-			long checkpointId) {
-		this(job, taskExecutionId, checkpointId, null);
+			CheckpointMetaData checkpointMetaData) {
+		this(job, taskExecutionId, checkpointMetaData, null);
 	}
 
 	public AcknowledgeCheckpoint(
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
-			long checkpointId,
+			CheckpointMetaData checkpointMetaData,
 			CheckpointStateHandles checkpointStateHandles) {
-		this(job, taskExecutionId, checkpointId, checkpointStateHandles, -1L, -1L, -1L, -1L);
-	}
 
-	public AcknowledgeCheckpoint(
-			JobID job,
-			ExecutionAttemptID taskExecutionId,
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
-
-		super(job, taskExecutionId, checkpointId);
+		super(job, taskExecutionId, checkpointMetaData.getCheckpointId());
 
 		this.checkpointStateHandles = checkpointStateHandles;
-
+		this.checkpointMetaData = checkpointMetaData;
 		// these may be "-1", in case the values are unknown or not set
-		checkArgument(synchronousDurationMillis >= -1);
-		checkArgument(asynchronousDurationMillis >= -1);
-		checkArgument(bytesBufferedInAlignment >= -1);
-		checkArgument(alignmentDurationNanos >= -1);
-
-		this.synchronousDurationMillis = synchronousDurationMillis;
-		this.asynchronousDurationMillis = asynchronousDurationMillis;
-		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
-		this.alignmentDurationNanos = alignmentDurationNanos;
+		checkArgument(checkpointMetaData.getSyncDurationMillis() >= -1);
+		checkArgument(checkpointMetaData.getAsyncDurationMillis() >= -1);
+		checkArgument(checkpointMetaData.getBytesBufferedInAlignment() >= -1);
+		checkArgument(checkpointMetaData.getAlignmentDurationNanos() >= -1);
 	}
 
 	// ------------------------------------------------------------------------
@@ -103,19 +77,19 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 	}
 
 	public long getSynchronousDurationMillis() {
-		return synchronousDurationMillis;
+		return checkpointMetaData.getSyncDurationMillis();
 	}
 
 	public long getAsynchronousDurationMillis() {
-		return asynchronousDurationMillis;
+		return checkpointMetaData.getAsyncDurationMillis();
 	}
 
 	public long getBytesBufferedInAlignment() {
-		return bytesBufferedInAlignment;
+		return checkpointMetaData.getBytesBufferedInAlignment();
 	}
 
 	public long getAlignmentDurationNanos() {
-		return alignmentDurationNanos;
+		return checkpointMetaData.getAlignmentDurationNanos();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 8bf1127..6f1bf7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -41,18 +42,12 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 	public void acknowledgeCheckpoint(
 			JobID jobID,
 			ExecutionAttemptID executionAttemptID,
-			long checkpointID,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles) {
 
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-				jobID, executionAttemptID, checkpointID,
-				checkpointStateHandles,
-				synchronousDurationMillis, asynchronousDurationMillis,
-				bytesBufferedInAlignment, alignmentDurationNanos);
+				jobID, executionAttemptID, checkpointMetaData,
+				checkpointStateHandles);
 
 		actorGateway.tell(message);
 	}
@@ -61,14 +56,13 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 	public void declineCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		long checkpointTimestamp) {
+		CheckpointMetaData checkpointMetaData) {
 
 		DeclineCheckpoint decline = new DeclineCheckpoint(
 			jobID,
 			executionAttemptID,
-			checkpointID,
-			checkpointTimestamp);
+			checkpointMetaData.getCheckpointId(),
+			checkpointMetaData.getTimestamp());
 
 		actorGateway.tell(decline);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 698a7f4..4fa20e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 
@@ -34,40 +35,27 @@ public interface CheckpointResponder {
 	 *             Job ID of the running job
 	 * @param executionAttemptID
 	 *             Execution attempt ID of the running task
-	 * @param checkpointID
-	 *             Checkpoint ID of the checkpoint
-	 * @param checkpointStateHandles 
+	 * @param checkpointStateHandles
 	 *             State handles for the checkpoint
-	 * @param synchronousDurationMillis
-	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
-	 * @param asynchronousDurationMillis
-	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
-	 * @param bytesBufferedInAlignment
-	 *             The number of bytes that were buffered during the checkpoint alignment phase
-	 * @param alignmentDurationNanos
-	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took
+	 * @param checkpointMetaData
+	 *             Meta data for this checkpoint
+	 *
 	 */
 	void acknowledgeCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		CheckpointStateHandles checkpointStateHandles,
-		long synchronousDurationMillis,
-		long asynchronousDurationMillis,
-		long bytesBufferedInAlignment,
-		long alignmentDurationNanos);
+		CheckpointMetaData checkpointMetaData,
+		CheckpointStateHandles checkpointStateHandles);
 
 	/**
 	 * Declines the given checkpoint.
 	 *
 	 * @param jobID Job ID of the running job
 	 * @param executionAttemptID Execution attempt ID of the running task
-	 * @param checkpointID Checkpoint ID of the checkpoint
-	 * @param checkpointTimestamp Timestamp of the checkpoint
+	 * @param checkpointMetaData Meta data for this checkpoint
 	 */
 	void declineCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		long checkpointTimestamp);
+		CheckpointMetaData checkpointMetaData);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c2ba7ef..f6720e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -236,33 +237,20 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
-
-		acknowledgeCheckpoint(checkpointId, null,
-				synchronousDurationMillis, asynchronousDurationMillis,
-				bytesBufferedInAlignment, alignmentDurationNanos);
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
+
+		acknowledgeCheckpoint(checkpointMetaData, null);
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles) {
 
 
 		checkpointResponder.acknowledgeCheckpoint(
-				jobId, executionId, checkpointId,
-				checkpointStateHandles,
-				synchronousDurationMillis, asynchronousDurationMillis,
-				bytesBufferedInAlignment, alignmentDurationNanos);
+				jobId, executionId, checkpointMetaData,
+				checkpointStateHandles);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8463fa0..02a41b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -990,9 +991,12 @@ public class Task implements Runnable, TaskActions {
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
 	 */
-	public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
+	public void triggerCheckpointBarrier(long checkpointID, long checkpointTimestamp) {
+
 		AbstractInvokable invokable = this.invokable;
 
+		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
+
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
 			if (invokable instanceof StatefulTask) {
 
@@ -1004,9 +1008,9 @@ public class Task implements Runnable, TaskActions {
 					@Override
 					public void run() {
 						try {
-							boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
+							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
 							if (!success) {
-								checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+								checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointMetaData);
 							}
 						}
 						catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 972f0ea..6289fcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -305,15 +305,17 @@ public class CheckpointCoordinatorTest {
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 			}
 
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
@@ -549,20 +551,22 @@ public class CheckpointCoordinatorTest {
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 			}
 
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// acknowledge the other task.
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
 
 			// the checkpoint is internally converted to a successful checkpoint and the
 			// pending checkpoint object is disposed
@@ -593,8 +597,9 @@ public class CheckpointCoordinatorTest {
 			coord.triggerCheckpoint(timestampNew);
 
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+			CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -686,8 +691,10 @@ public class CheckpointCoordinatorTest {
 			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
 
+			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
+
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
@@ -705,6 +712,8 @@ public class CheckpointCoordinatorTest {
 			}
 			long checkpointId2 = pending2.getCheckpointId();
 
+			CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
+
 			// trigger messages should have been sent
 			verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
@@ -713,10 +722,10 @@ public class CheckpointCoordinatorTest {
 
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
 
 			// now, the first checkpoint should be confirmed
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -728,7 +737,7 @@ public class CheckpointCoordinatorTest {
 					new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
 
 			// send the last remaining ack for the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
 
 			// now, the second checkpoint should be confirmed
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -821,8 +830,10 @@ public class CheckpointCoordinatorTest {
 			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
 
+			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
+
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
@@ -848,10 +859,12 @@ public class CheckpointCoordinatorTest {
 
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+			CheckpointMetaData checkpointMetaData2= new CheckpointMetaData(checkpointId2, 0L);
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
 
 			// now, the second checkpoint should be confirmed, and the first discarded
 			// actually both pending checkpoints are discarded, and the second has been transformed
@@ -875,7 +888,7 @@ public class CheckpointCoordinatorTest {
 					new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L)));
 
 			coord.shutdown();
 		}
@@ -931,7 +944,7 @@ public class CheckpointCoordinatorTest {
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
 			assertFalse(checkpoint.isDiscarded());
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(checkpoint.getCheckpointId(), 0L)));
 
 			// wait until the checkpoint must have expired.
 			// we check every 250 msecs conservatively for 5 seconds
@@ -1000,14 +1013,16 @@ public class CheckpointCoordinatorTest {
 			// of the vertices that need to be acknowledged.
 			// non of the messages should throw an exception
 
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
 			// wrong job id
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointMetaData));
 
 			// unknown checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(1L, 0L)));
 
 			// unknown ack vertex
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
 
 			coord.shutdown();
 		}
@@ -1271,9 +1286,10 @@ public class CheckpointCoordinatorTest {
 		assertFalse(pending.canBeSubsumed());
 		assertTrue(pending instanceof PendingSavepoint);
 
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		// acknowledge from one of the tasks
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 		assertEquals(1, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
 		assertFalse(pending.isDiscarded());
@@ -1281,13 +1297,13 @@ public class CheckpointCoordinatorTest {
 		assertFalse(savepointFuture.isCompleted());
 
 		// acknowledge the same task again (should not matter)
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.isFullyAcknowledged());
 		assertFalse(savepointFuture.isCompleted());
 
 		// acknowledge the other task.
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
 
 		// the checkpoint is internally converted to a successful checkpoint and the
 		// pending checkpoint object is disposed
@@ -1319,9 +1335,11 @@ public class CheckpointCoordinatorTest {
 		savepointFuture = coord.triggerSavepoint(timestampNew);
 		assertFalse(savepointFuture.isCompleted());
 
+
 		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+		CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1386,6 +1404,7 @@ public class CheckpointCoordinatorTest {
 		// Trigger savepoint and checkpoint
 		Future<String> savepointFuture1 = coord.triggerSavepoint(timestamp);
 		long savepointId1 = counter.getLast();
+		CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L);
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
 		assertTrue(coord.triggerCheckpoint(timestamp + 1));
@@ -1395,9 +1414,11 @@ public class CheckpointCoordinatorTest {
 		long checkpointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
+		CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
+
 		// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData2));
 
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1410,11 +1431,12 @@ public class CheckpointCoordinatorTest {
 
 		Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp + 4);
 		long savepointId2 = counter.getLast();
+		CheckpointMetaData checkpointMetaDataS2 = new CheckpointMetaData(savepointId2, 0L);
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
 		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS2));
 
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1424,8 +1446,8 @@ public class CheckpointCoordinatorTest {
 		assertTrue(savepointFuture2.isCompleted());
 
 		// Ack first savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS1));
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1488,7 +1510,7 @@ public class CheckpointCoordinatorTest {
 					.sendMessageToCurrentExecution(any(TriggerCheckpoint.class), eq(triggerAttemptID));
 			
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
 			
 			// this should have immediately triggered a new checkpoint
 			now = System.currentTimeMillis();
@@ -1562,7 +1584,7 @@ public class CheckpointCoordinatorTest {
 			// now we acknowledge the second checkpoint, which should subsume the first checkpoint
 			// and allow two more checkpoints to be triggered
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(2L, 0L)));
 
 			// after a while, there should be the new checkpoints
 			final long newTimeout = System.currentTimeMillis() + 60000;
@@ -1690,7 +1712,7 @@ public class CheckpointCoordinatorTest {
 		// ACK all savepoints
 		long checkpointId = checkpointIDCounter.getLast();
 		for (int i = 0; i < numSavepoints; i++, checkpointId--) {
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, new CheckpointMetaData(checkpointId, 0L)));
 		}
 
 		// After ACKs, all should be completed
@@ -1789,6 +1811,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
@@ -1802,7 +1825,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1816,7 +1839,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1892,6 +1915,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
@@ -1903,7 +1927,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1917,7 +1941,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2005,6 +2029,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = 
 				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
@@ -2020,7 +2045,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2037,7 +2062,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2126,6 +2151,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = 
 				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
@@ -2142,7 +2168,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2158,7 +2184,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2227,7 +2253,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					new CheckpointMetaData(checkpointId, 0L),
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index bb78b6a..c20c604 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -117,12 +117,12 @@ public class CheckpointStateRestoreTest {
 			final long checkpointId = pending.getCheckpointId();
 
 			CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(serializedState, null, serializedKeyGroupStates);
-
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
 
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -217,11 +217,13 @@ public class CheckpointStateRestoreTest {
 			// the difference to the test "testSetState" is that one stateful subtask does not report state
 			CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(serializedState, null, serializedKeyGroupStates);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
 
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 38231ec..6100856 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -458,10 +459,10 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
 			try {
 				ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
-						InstantiationUtil.serializeObject(checkpointId));
+						InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
 
 				RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
 
@@ -472,9 +473,8 @@ public class JobManagerHARecoveryTest {
 						new CheckpointStateHandles(chainedStateHandle, null, Collections.<KeyGroupsStateHandle>emptyList());
 
 				getEnvironment().acknowledgeCheckpoint(
-						checkpointId,
-						checkpointStateHandles,
-						0L, 0L, 0L, 0L);
+						new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
+						checkpointStateHandles);
 				return true;
 			} catch (Exception ex) {
 				throw new RuntimeException(ex);
@@ -482,8 +482,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 4873335..305625e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -62,7 +63,7 @@ public class CheckpointMessagesTest {
 	public void testConfirmTaskCheckpointed() {
 		try {
 			AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
-					new JobID(), new ExecutionAttemptID(), 569345L);
+					new JobID(), new ExecutionAttemptID(), new CheckpointMetaData(569345L, 0L));
 
 			KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
 
@@ -75,7 +76,7 @@ public class CheckpointMessagesTest {
 			AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
 					new JobID(),
 					new ExecutionAttemptID(),
-					87658976143L,
+					new CheckpointMetaData(87658976143L, 0L),
 					checkpointStateHandles);
 
 			testSerializabilityEqualsHashCode(noState);

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index c855230..bb07122 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -34,16 +34,13 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -154,18 +151,12 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c3ed6c0..eb55c4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -45,11 +46,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -315,18 +312,12 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7bc2c29..e2abe88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,11 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -36,19 +34,20 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
@@ -217,9 +216,9 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
 			lastCheckpointId++;
-			if (checkpointId == lastCheckpointId) {
+			if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
 				if (lastCheckpointId == NUM_CALLS) {
 					triggerLatch.trigger();
 				}
@@ -234,8 +233,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 4b6cac7..5b18c75 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -122,6 +123,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 */
 	public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
 		this.topics = checkNotNull(topics);
+		checkArgument(topics.size() > 0, "You have to define at least one topic.");
 		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 76a69bc..6d2dc70 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -36,6 +36,7 @@ import org.mockito.Matchers;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -159,7 +160,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		assertFalse(listState.get().iterator().hasNext());
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSnapshotState() throws Exception {
@@ -177,9 +178,9 @@ public class FlinkKafkaConsumerBaseTest {
 
 		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
 		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-			
+
 		final LinkedMap pendingCheckpoints = new LinkedMap();
-	
+
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true);
 		assertEquals(0, pendingCheckpoints.size());
 
@@ -222,7 +223,7 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(state2, snapshot2);
 		assertEquals(2, pendingCheckpoints.size());
 		assertEquals(state2, pendingCheckpoints.get(140L));
-		
+
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
 		assertEquals(1, pendingCheckpoints.size());
@@ -241,7 +242,7 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(state3, snapshot3);
 		assertEquals(2, pendingCheckpoints.size());
 		assertEquals(state3, pendingCheckpoints.get(141L));
-		
+
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
 		assertEquals(0, pendingCheckpoints.size());
@@ -302,7 +303,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		@SuppressWarnings("unchecked")
 		public DummyFlinkKafkaConsumer() {
-			super(Arrays.asList("abc", "def"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index d60c999..7f01129 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,20 +17,20 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayDeque;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
@@ -219,9 +219,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			releaseBlocks();
 
 			if (toNotifyOnCheckpoint != null) {
-				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-						receivedBarrier.getId(), receivedBarrier.getTimestamp(),
-						bufferSpiller.getBytesWritten(), latestAlignmentDurationNanos);
+				CheckpointMetaData checkpointMetaData =
+						new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+				checkpointMetaData.
+						setBytesBufferedInAlignment(bufferSpiller.getBytesWritten()).
+						setAlignmentDurationNanos(latestAlignmentDurationNanos);
+
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 1db5845..86945a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,10 +19,11 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.util.ArrayDeque;
 
@@ -115,8 +116,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
 			if (toNotifyOnCheckpoint != null) {
-				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-						receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
+				CheckpointMetaData checkpointMetaData =
+						new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+
+				checkpointMetaData.
+						setBytesBufferedInAlignment(0L).
+						setAlignmentDurationNanos(0L);
+
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
 			}
 			return;
 		}
@@ -149,8 +156,13 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				
 				// notify the listener
 				if (toNotifyOnCheckpoint != null) {
-					toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-							receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
+					CheckpointMetaData checkpointMetaData =
+							new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+					checkpointMetaData.
+							setBytesBufferedInAlignment(0L).
+							setAlignmentDurationNanos(0L);
+
+					toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
 				}
 			}
 		}