You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:40 UTC

[1/4] flink git commit: [hotfix] Minor cleanup in FlinkKafkaConsumerBase and Test

Repository: flink
Updated Branches:
  refs/heads/master 98710ead5 -> 0dac7ad00


[hotfix] Minor cleanup in FlinkKafkaConsumerBase and Test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/241d47b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/241d47b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/241d47b8

Branch: refs/heads/master
Commit: 241d47b867cc6bb21d11c6416697bd078885cf6d
Parents: fee1430
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 11:30:59 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:09 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java          |  3 ---
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java      | 10 +++++-----
 2 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/241d47b8/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 8d63345..4b6cac7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -47,7 +47,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -123,8 +122,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 */
 	public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
 		this.topics = checkNotNull(topics);
-		checkArgument(topics.size() > 0, "You have to define at least one topic.");
-
 		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/241d47b8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 373d6ab..76a69bc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -36,7 +36,6 @@ import org.mockito.Matchers;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -173,9 +172,9 @@ public class FlinkKafkaConsumerBaseTest {
 		state2.put(new KafkaTopicPartition("def", 7), 987654329L);
 
 		final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
-		state2.put(new KafkaTopicPartition("abc", 13), 16780L);
-		state2.put(new KafkaTopicPartition("def", 7), 987654377L);
-		
+		state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+		state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
 		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
 		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
 			
@@ -186,12 +185,13 @@ public class FlinkKafkaConsumerBaseTest {
 
 		OperatorStateStore backend = mock(OperatorStateStore.class);
 
+		TestingListState<Serializable> init = new TestingListState<>();
 		TestingListState<Serializable> listState1 = new TestingListState<>();
 		TestingListState<Serializable> listState2 = new TestingListState<>();
 		TestingListState<Serializable> listState3 = new TestingListState<>();
 
 		when(backend.getSerializableListState(Matchers.any(String.class))).
-				thenReturn(listState1, listState1, listState2, listState3);
+				thenReturn(init, listState1, listState2, listState3);
 
 		consumer.initializeState(backend);
 


[4/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData

Posted by uc...@apache.org.
[FLINK-4730] Introducing CheckpointMetaData

This closes #2583.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dac7ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dac7ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dac7ad0

Branch: refs/heads/master
Commit: 0dac7ad00a25c4b038b57009f41f5512f8ce73c3
Parents: 241d47b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:19:37 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:12 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  15 +--
 .../runtime/checkpoint/CheckpointMetaData.java  | 113 ++++++++++++++++
 .../runtime/checkpoint/CheckpointMetrics.java   | 131 +++++++++++++++++++
 .../flink/runtime/execution/Environment.java    |  41 ++----
 .../runtime/jobgraph/tasks/StatefulTask.java    |  16 +--
 .../checkpoint/AcknowledgeCheckpoint.java       |  56 +++-----
 .../ActorGatewayCheckpointResponder.java        |  22 ++--
 .../taskmanager/CheckpointResponder.java        |  30 ++---
 .../runtime/taskmanager/RuntimeEnvironment.java |  28 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 118 ++++++++++-------
 .../checkpoint/CheckpointStateRestoreTest.java  |  24 ++--
 .../jobmanager/JobManagerHARecoveryTest.java    |  13 +-
 .../messages/CheckpointMessagesTest.java        |   5 +-
 .../operators/testutils/DummyEnvironment.java   |  17 +--
 .../operators/testutils/MockEnvironment.java    |  15 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  16 +--
 .../kafka/FlinkKafkaConsumerBase.java           |   2 +
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  13 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  20 +--
 .../streaming/runtime/io/BarrierTracker.java    |  22 +++-
 .../streaming/runtime/tasks/StreamTask.java     |  71 +++++-----
 .../streaming/runtime/io/BarrierBufferTest.java |  17 ++-
 .../runtime/io/BarrierTrackerTest.java          |  11 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  12 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |   6 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  19 +--
 27 files changed, 515 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 2ebd84a..8f58075 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -134,14 +135,10 @@ public class RocksDBAsyncSnapshotTest {
 
 			@Override
 			public void acknowledgeCheckpoint(
-					long checkpointId,
-					CheckpointStateHandles checkpointStateHandles,
-					long synchronousDurationMillis, long asynchronousDurationMillis,
-					long bytesBufferedInAlignment, long alignmentDurationNanos) {
+					CheckpointMetaData checkpointMetaData,
+					CheckpointStateHandles checkpointStateHandles) {
 
-				super.acknowledgeCheckpoint(checkpointId, checkpointStateHandles,
-						synchronousDurationMillis, asynchronousDurationMillis,
-						bytesBufferedInAlignment, alignmentDurationNanos);
+				super.acknowledgeCheckpoint(checkpointMetaData);
 
 				// block on the latch, to verify that triggerCheckpoint returns below,
 				// even though the async checkpoint would not finish
@@ -171,7 +168,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(42, 17);
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
 
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 
@@ -250,7 +247,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(42, 17);
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
 		task.cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
new file mode 100644
index 0000000..6f117f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Encapsulates all the meta data for a checkpoint.
+ */
+public class CheckpointMetaData implements Serializable {
+
+	private static final long serialVersionUID = -2387652345781312442L;
+
+	/** The ID of the checkpoint */
+	private final long checkpointId;
+
+	/** The timestamp of the checkpoint */
+	private final long timestamp;
+
+	private final CheckpointMetrics metrics;
+
+	public CheckpointMetaData(long checkpointId, long timestamp) {
+		this.checkpointId = checkpointId;
+		this.timestamp = timestamp;
+		this.metrics = new CheckpointMetrics();
+	}
+
+	public CheckpointMetaData(
+			long checkpointId,
+			long timestamp,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
+		this.checkpointId = checkpointId;
+		this.timestamp = timestamp;
+		this.metrics = new CheckpointMetrics(
+				bytesBufferedInAlignment,
+				alignmentDurationNanos,
+				synchronousDurationMillis,
+				asynchronousDurationMillis);
+	}
+
+	public CheckpointMetrics getMetrics() {
+		return metrics;
+	}
+
+	public CheckpointMetaData setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+		Preconditions.checkArgument(bytesBufferedInAlignment >= 0);
+		this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment);
+		return this;
+	}
+
+	public CheckpointMetaData setAlignmentDurationNanos(long alignmentDurationNanos) {
+		Preconditions.checkArgument(alignmentDurationNanos >= 0);
+		this.metrics.setAlignmentDurationNanos(alignmentDurationNanos);
+		return this;
+	}
+
+	public CheckpointMetaData setSyncDurationMillis(long syncDurationMillis) {
+		Preconditions.checkArgument(syncDurationMillis >= 0);
+		this.metrics.setSyncDurationMillis(syncDurationMillis);
+		return this;
+	}
+
+	public CheckpointMetaData setAsyncDurationMillis(long asyncDurationMillis) {
+		Preconditions.checkArgument(asyncDurationMillis >= 0);
+		this.metrics.setAsyncDurationMillis(asyncDurationMillis);
+		return this;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public long getBytesBufferedInAlignment() {
+		return metrics.getBytesBufferedInAlignment();
+	}
+
+	public long getAlignmentDurationNanos() {
+		return metrics.getAlignmentDurationNanos();
+	}
+
+	public long getSyncDurationMillis() {
+		return metrics.getSyncDurationMillis();
+	}
+
+	public long getAsyncDurationMillis() {
+		return metrics.getAsyncDurationMillis();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
new file mode 100644
index 0000000..4155290
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+public class CheckpointMetrics implements Serializable {
+
+	/**
+	 * The number of bytes that were buffered during the checkpoint alignment phase
+	 */
+	private long bytesBufferedInAlignment;
+
+	/**
+	 * The duration (in nanoseconds) that the stream alignment for the checkpoint took
+	 */
+	private long alignmentDurationNanos;
+
+	/* The duration (in milliseconds) of the synchronous part of the operator checkpoint */
+	private long syncDurationMillis;
+
+	/* The duration (in milliseconds) of the asynchronous part of the operator checkpoint  */
+	private long asyncDurationMillis;
+
+	public CheckpointMetrics() {
+		this(-1L, -1L, -1L, -1L);
+	}
+
+	public CheckpointMetrics(
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos,
+			long syncDurationMillis,
+			long asyncDurationMillis) {
+
+		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+		this.alignmentDurationNanos = alignmentDurationNanos;
+		this.syncDurationMillis = syncDurationMillis;
+		this.asyncDurationMillis = asyncDurationMillis;
+	}
+
+	public long getBytesBufferedInAlignment() {
+		return bytesBufferedInAlignment;
+	}
+
+	public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+	}
+
+	public long getAlignmentDurationNanos() {
+		return alignmentDurationNanos;
+	}
+
+	public void setAlignmentDurationNanos(long alignmentDurationNanos) {
+		this.alignmentDurationNanos = alignmentDurationNanos;
+	}
+
+	public long getSyncDurationMillis() {
+		return syncDurationMillis;
+	}
+
+	public void setSyncDurationMillis(long syncDurationMillis) {
+		this.syncDurationMillis = syncDurationMillis;
+	}
+
+	public long getAsyncDurationMillis() {
+		return asyncDurationMillis;
+	}
+
+	public void setAsyncDurationMillis(long asyncDurationMillis) {
+		this.asyncDurationMillis = asyncDurationMillis;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		CheckpointMetrics that = (CheckpointMetrics) o;
+
+		if (bytesBufferedInAlignment != that.bytesBufferedInAlignment) {
+			return false;
+		}
+		if (alignmentDurationNanos != that.alignmentDurationNanos) {
+			return false;
+		}
+		if (syncDurationMillis != that.syncDurationMillis) {
+			return false;
+		}
+		return asyncDurationMillis == that.asyncDurationMillis;
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (bytesBufferedInAlignment ^ (bytesBufferedInAlignment >>> 32));
+		result = 31 * result + (int) (alignmentDurationNanos ^ (alignmentDurationNanos >>> 32));
+		result = 31 * result + (int) (syncDurationMillis ^ (syncDurationMillis >>> 32));
+		result = 31 * result + (int) (asyncDurationMillis ^ (asyncDurationMillis >>> 32));
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "CheckpointMetrics{" +
+				"bytesBufferedInAlignment=" + bytesBufferedInAlignment +
+				", alignmentDurationNanos=" + alignmentDurationNanos +
+				", syncDurationMillis=" + syncDurationMillis +
+				", asyncDurationMillis=" + asyncDurationMillis +
+				'}';
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index f6cde95..cbbeec7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -19,20 +19,21 @@
 package org.apache.flink.runtime.execution;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KvState;
@@ -161,47 +162,21 @@ public interface Environment {
 	 * to for the checkpoint with the give checkpoint-ID. This method does not include
 	 * any state in the checkpoint.
 	 * 
-	 * @param checkpointId
-	 *             The ID of the checkpoint.
-	 * @param synchronousDurationMillis
-	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
-	 * @param asynchronousDurationMillis
-	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
-	 * @param bytesBufferedInAlignment
-	 *             The number of bytes that were buffered during the checkpoint alignment phase
-	 * @param alignmentDurationNanos
-	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
+	 * @param checkpointMetaData the meta data for this checkpoint
 	 */
-	void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos);
+	void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData);
 
 	/**
 	 * Confirms that the invokable has successfully completed all required steps for
 	 * the checkpoint with the give checkpoint-ID. This method does include
 	 * the given state in the checkpoint.
 	 *
-	 * @param checkpointId The ID of the checkpoint.
 	 * @param checkpointStateHandles All state handles for the checkpointed state
-	 * @param synchronousDurationMillis
-	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
-	 * @param asynchronousDurationMillis
-	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
-	 * @param bytesBufferedInAlignment
-	 *             The number of bytes that were buffered during the checkpoint alignment phase
-	 * @param alignmentDurationNanos
-	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
+	 * @param checkpointMetaData the meta data for this checkpoint
 	 */
 	void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos);
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles);
 
 	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 55e3e09..e1d15e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -52,29 +53,24 @@ public interface StatefulTask {
 	 * 
 	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
 	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
-	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(long, long, long, long)}
+	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData)}
 	 * method.
 	 *
-	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
-	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 * @param checkpointMetaData Meta data for about this checkpoint
 	 *
 	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
-	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
 
 	/**
 	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
 	 * barriers on all input streams.
 	 * 
-	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
-	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
-	 * @param bytesAligned The number of bytes that were buffered during the alignment of the streams.
-	 * @param alignmentTimeNanos The time that the stream alignment took, in nanoseconds.   
+	 * @param checkpointMetaData Meta data for about this checkpoint
 	 * 
 	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
 	 */
-	void triggerCheckpointOnBarrier(
-			long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception;
+	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index e95e7b3..ac14d3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 
@@ -39,59 +40,32 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 	private final CheckpointStateHandles checkpointStateHandles;
 
-	/** The duration (in milliseconds) that the synchronous part of the checkpoint took */
-	private final long synchronousDurationMillis;
-
-	/** The duration (in milliseconds) that the asynchronous part of the checkpoint took */
-	private final long asynchronousDurationMillis;
-
-	/** The number of bytes that were buffered during the checkpoint alignment phase */
-	private final long bytesBufferedInAlignment;
-
-	/** The duration (in nanoseconds) that the alignment phase of the task's checkpoint took */
-	private final long alignmentDurationNanos;
+	private final CheckpointMetaData checkpointMetaData;
 
 	// ------------------------------------------------------------------------
 
 	public AcknowledgeCheckpoint(
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
-			long checkpointId) {
-		this(job, taskExecutionId, checkpointId, null);
+			CheckpointMetaData checkpointMetaData) {
+		this(job, taskExecutionId, checkpointMetaData, null);
 	}
 
 	public AcknowledgeCheckpoint(
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
-			long checkpointId,
+			CheckpointMetaData checkpointMetaData,
 			CheckpointStateHandles checkpointStateHandles) {
-		this(job, taskExecutionId, checkpointId, checkpointStateHandles, -1L, -1L, -1L, -1L);
-	}
 
-	public AcknowledgeCheckpoint(
-			JobID job,
-			ExecutionAttemptID taskExecutionId,
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
-
-		super(job, taskExecutionId, checkpointId);
+		super(job, taskExecutionId, checkpointMetaData.getCheckpointId());
 
 		this.checkpointStateHandles = checkpointStateHandles;
-
+		this.checkpointMetaData = checkpointMetaData;
 		// these may be "-1", in case the values are unknown or not set
-		checkArgument(synchronousDurationMillis >= -1);
-		checkArgument(asynchronousDurationMillis >= -1);
-		checkArgument(bytesBufferedInAlignment >= -1);
-		checkArgument(alignmentDurationNanos >= -1);
-
-		this.synchronousDurationMillis = synchronousDurationMillis;
-		this.asynchronousDurationMillis = asynchronousDurationMillis;
-		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
-		this.alignmentDurationNanos = alignmentDurationNanos;
+		checkArgument(checkpointMetaData.getSyncDurationMillis() >= -1);
+		checkArgument(checkpointMetaData.getAsyncDurationMillis() >= -1);
+		checkArgument(checkpointMetaData.getBytesBufferedInAlignment() >= -1);
+		checkArgument(checkpointMetaData.getAlignmentDurationNanos() >= -1);
 	}
 
 	// ------------------------------------------------------------------------
@@ -103,19 +77,19 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 	}
 
 	public long getSynchronousDurationMillis() {
-		return synchronousDurationMillis;
+		return checkpointMetaData.getSyncDurationMillis();
 	}
 
 	public long getAsynchronousDurationMillis() {
-		return asynchronousDurationMillis;
+		return checkpointMetaData.getAsyncDurationMillis();
 	}
 
 	public long getBytesBufferedInAlignment() {
-		return bytesBufferedInAlignment;
+		return checkpointMetaData.getBytesBufferedInAlignment();
 	}
 
 	public long getAlignmentDurationNanos() {
-		return alignmentDurationNanos;
+		return checkpointMetaData.getAlignmentDurationNanos();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 8bf1127..6f1bf7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -41,18 +42,12 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 	public void acknowledgeCheckpoint(
 			JobID jobID,
 			ExecutionAttemptID executionAttemptID,
-			long checkpointID,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles) {
 
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-				jobID, executionAttemptID, checkpointID,
-				checkpointStateHandles,
-				synchronousDurationMillis, asynchronousDurationMillis,
-				bytesBufferedInAlignment, alignmentDurationNanos);
+				jobID, executionAttemptID, checkpointMetaData,
+				checkpointStateHandles);
 
 		actorGateway.tell(message);
 	}
@@ -61,14 +56,13 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 	public void declineCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		long checkpointTimestamp) {
+		CheckpointMetaData checkpointMetaData) {
 
 		DeclineCheckpoint decline = new DeclineCheckpoint(
 			jobID,
 			executionAttemptID,
-			checkpointID,
-			checkpointTimestamp);
+			checkpointMetaData.getCheckpointId(),
+			checkpointMetaData.getTimestamp());
 
 		actorGateway.tell(decline);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 698a7f4..4fa20e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 
@@ -34,40 +35,27 @@ public interface CheckpointResponder {
 	 *             Job ID of the running job
 	 * @param executionAttemptID
 	 *             Execution attempt ID of the running task
-	 * @param checkpointID
-	 *             Checkpoint ID of the checkpoint
-	 * @param checkpointStateHandles 
+	 * @param checkpointStateHandles
 	 *             State handles for the checkpoint
-	 * @param synchronousDurationMillis
-	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
-	 * @param asynchronousDurationMillis
-	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
-	 * @param bytesBufferedInAlignment
-	 *             The number of bytes that were buffered during the checkpoint alignment phase
-	 * @param alignmentDurationNanos
-	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took
+	 * @param checkpointMetaData
+	 *             Meta data for this checkpoint
+	 *
 	 */
 	void acknowledgeCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		CheckpointStateHandles checkpointStateHandles,
-		long synchronousDurationMillis,
-		long asynchronousDurationMillis,
-		long bytesBufferedInAlignment,
-		long alignmentDurationNanos);
+		CheckpointMetaData checkpointMetaData,
+		CheckpointStateHandles checkpointStateHandles);
 
 	/**
 	 * Declines the given checkpoint.
 	 *
 	 * @param jobID Job ID of the running job
 	 * @param executionAttemptID Execution attempt ID of the running task
-	 * @param checkpointID Checkpoint ID of the checkpoint
-	 * @param checkpointTimestamp Timestamp of the checkpoint
+	 * @param checkpointMetaData Meta data for this checkpoint
 	 */
 	void declineCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		long checkpointTimestamp);
+		CheckpointMetaData checkpointMetaData);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c2ba7ef..f6720e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -236,33 +237,20 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
-
-		acknowledgeCheckpoint(checkpointId, null,
-				synchronousDurationMillis, asynchronousDurationMillis,
-				bytesBufferedInAlignment, alignmentDurationNanos);
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
+
+		acknowledgeCheckpoint(checkpointMetaData, null);
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles) {
 
 
 		checkpointResponder.acknowledgeCheckpoint(
-				jobId, executionId, checkpointId,
-				checkpointStateHandles,
-				synchronousDurationMillis, asynchronousDurationMillis,
-				bytesBufferedInAlignment, alignmentDurationNanos);
+				jobId, executionId, checkpointMetaData,
+				checkpointStateHandles);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8463fa0..02a41b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -990,9 +991,12 @@ public class Task implements Runnable, TaskActions {
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
 	 */
-	public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
+	public void triggerCheckpointBarrier(long checkpointID, long checkpointTimestamp) {
+
 		AbstractInvokable invokable = this.invokable;
 
+		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
+
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
 			if (invokable instanceof StatefulTask) {
 
@@ -1004,9 +1008,9 @@ public class Task implements Runnable, TaskActions {
 					@Override
 					public void run() {
 						try {
-							boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
+							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
 							if (!success) {
-								checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+								checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointMetaData);
 							}
 						}
 						catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 972f0ea..6289fcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -305,15 +305,17 @@ public class CheckpointCoordinatorTest {
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 			}
 
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
@@ -549,20 +551,22 @@ public class CheckpointCoordinatorTest {
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 			}
 
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
 			// acknowledge from one of the tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// acknowledge the same task again (should not matter)
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// acknowledge the other task.
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
 
 			// the checkpoint is internally converted to a successful checkpoint and the
 			// pending checkpoint object is disposed
@@ -593,8 +597,9 @@ public class CheckpointCoordinatorTest {
 			coord.triggerCheckpoint(timestampNew);
 
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+			CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -686,8 +691,10 @@ public class CheckpointCoordinatorTest {
 			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
 
+			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
+
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
@@ -705,6 +712,8 @@ public class CheckpointCoordinatorTest {
 			}
 			long checkpointId2 = pending2.getCheckpointId();
 
+			CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
+
 			// trigger messages should have been sent
 			verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
@@ -713,10 +722,10 @@ public class CheckpointCoordinatorTest {
 
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
 
 			// now, the first checkpoint should be confirmed
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -728,7 +737,7 @@ public class CheckpointCoordinatorTest {
 					new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
 
 			// send the last remaining ack for the second checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
 
 			// now, the second checkpoint should be confirmed
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -821,8 +830,10 @@ public class CheckpointCoordinatorTest {
 			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
 					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
 
+			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
+
 			// acknowledge one of the three tasks
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
@@ -848,10 +859,12 @@ public class CheckpointCoordinatorTest {
 
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+			CheckpointMetaData checkpointMetaData2= new CheckpointMetaData(checkpointId2, 0L);
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
 
 			// now, the second checkpoint should be confirmed, and the first discarded
 			// actually both pending checkpoints are discarded, and the second has been transformed
@@ -875,7 +888,7 @@ public class CheckpointCoordinatorTest {
 					new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L)));
 
 			coord.shutdown();
 		}
@@ -931,7 +944,7 @@ public class CheckpointCoordinatorTest {
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
 			assertFalse(checkpoint.isDiscarded());
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(checkpoint.getCheckpointId(), 0L)));
 
 			// wait until the checkpoint must have expired.
 			// we check every 250 msecs conservatively for 5 seconds
@@ -1000,14 +1013,16 @@ public class CheckpointCoordinatorTest {
 			// of the vertices that need to be acknowledged.
 			// non of the messages should throw an exception
 
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
 			// wrong job id
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointMetaData));
 
 			// unknown checkpoint
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(1L, 0L)));
 
 			// unknown ack vertex
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
 
 			coord.shutdown();
 		}
@@ -1271,9 +1286,10 @@ public class CheckpointCoordinatorTest {
 		assertFalse(pending.canBeSubsumed());
 		assertTrue(pending instanceof PendingSavepoint);
 
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		// acknowledge from one of the tasks
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 		assertEquals(1, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
 		assertFalse(pending.isDiscarded());
@@ -1281,13 +1297,13 @@ public class CheckpointCoordinatorTest {
 		assertFalse(savepointFuture.isCompleted());
 
 		// acknowledge the same task again (should not matter)
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.isFullyAcknowledged());
 		assertFalse(savepointFuture.isCompleted());
 
 		// acknowledge the other task.
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
 
 		// the checkpoint is internally converted to a successful checkpoint and the
 		// pending checkpoint object is disposed
@@ -1319,9 +1335,11 @@ public class CheckpointCoordinatorTest {
 		savepointFuture = coord.triggerSavepoint(timestampNew);
 		assertFalse(savepointFuture.isCompleted());
 
+
 		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+		CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1386,6 +1404,7 @@ public class CheckpointCoordinatorTest {
 		// Trigger savepoint and checkpoint
 		Future<String> savepointFuture1 = coord.triggerSavepoint(timestamp);
 		long savepointId1 = counter.getLast();
+		CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L);
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
 		assertTrue(coord.triggerCheckpoint(timestamp + 1));
@@ -1395,9 +1414,11 @@ public class CheckpointCoordinatorTest {
 		long checkpointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
+		CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
+
 		// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData2));
 
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1410,11 +1431,12 @@ public class CheckpointCoordinatorTest {
 
 		Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp + 4);
 		long savepointId2 = counter.getLast();
+		CheckpointMetaData checkpointMetaDataS2 = new CheckpointMetaData(savepointId2, 0L);
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
 		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS2));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS2));
 
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1424,8 +1446,8 @@ public class CheckpointCoordinatorTest {
 		assertTrue(savepointFuture2.isCompleted());
 
 		// Ack first savepoint
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS1));
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1488,7 +1510,7 @@ public class CheckpointCoordinatorTest {
 					.sendMessageToCurrentExecution(any(TriggerCheckpoint.class), eq(triggerAttemptID));
 			
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
 			
 			// this should have immediately triggered a new checkpoint
 			now = System.currentTimeMillis();
@@ -1562,7 +1584,7 @@ public class CheckpointCoordinatorTest {
 			// now we acknowledge the second checkpoint, which should subsume the first checkpoint
 			// and allow two more checkpoints to be triggered
 			// now, once we acknowledge one checkpoint, it should trigger the next one
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(2L, 0L)));
 
 			// after a while, there should be the new checkpoints
 			final long newTimeout = System.currentTimeMillis() + 60000;
@@ -1690,7 +1712,7 @@ public class CheckpointCoordinatorTest {
 		// ACK all savepoints
 		long checkpointId = checkpointIDCounter.getLast();
 		for (int i = 0; i < numSavepoints; i++, checkpointId--) {
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, new CheckpointMetaData(checkpointId, 0L)));
 		}
 
 		// After ACKs, all should be completed
@@ -1789,6 +1811,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
@@ -1802,7 +1825,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1816,7 +1839,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1892,6 +1915,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
@@ -1903,7 +1927,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1917,7 +1941,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2005,6 +2029,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = 
 				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
@@ -2020,7 +2045,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2037,7 +2062,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2126,6 +2151,7 @@ public class CheckpointCoordinatorTest {
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 = 
 				CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
@@ -2142,7 +2168,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2158,7 +2184,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					checkpointMetaData,
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2227,7 +2253,7 @@ public class CheckpointCoordinatorTest {
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
+					new CheckpointMetaData(checkpointId, 0L),
 					checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index bb78b6a..c20c604 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -117,12 +117,12 @@ public class CheckpointStateRestoreTest {
 			final long checkpointId = pending.getCheckpointId();
 
 			CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(serializedState, null, serializedKeyGroupStates);
-
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
 
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -217,11 +217,13 @@ public class CheckpointStateRestoreTest {
 			// the difference to the test "testSetState" is that one stateful subtask does not report state
 			CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(serializedState, null, serializedKeyGroupStates);
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
 
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 38231ec..6100856 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -458,10 +459,10 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
 			try {
 				ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
-						InstantiationUtil.serializeObject(checkpointId));
+						InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
 
 				RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
 
@@ -472,9 +473,8 @@ public class JobManagerHARecoveryTest {
 						new CheckpointStateHandles(chainedStateHandle, null, Collections.<KeyGroupsStateHandle>emptyList());
 
 				getEnvironment().acknowledgeCheckpoint(
-						checkpointId,
-						checkpointStateHandles,
-						0L, 0L, 0L, 0L);
+						new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
+						checkpointStateHandles);
 				return true;
 			} catch (Exception ex) {
 				throw new RuntimeException(ex);
@@ -482,8 +482,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 4873335..305625e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -62,7 +63,7 @@ public class CheckpointMessagesTest {
 	public void testConfirmTaskCheckpointed() {
 		try {
 			AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
-					new JobID(), new ExecutionAttemptID(), 569345L);
+					new JobID(), new ExecutionAttemptID(), new CheckpointMetaData(569345L, 0L));
 
 			KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
 
@@ -75,7 +76,7 @@ public class CheckpointMessagesTest {
 			AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
 					new JobID(),
 					new ExecutionAttemptID(),
-					87658976143L,
+					new CheckpointMetaData(87658976143L, 0L),
 					checkpointStateHandles);
 
 			testSerializabilityEqualsHashCode(noState);

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index c855230..bb07122 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -34,16 +34,13 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -154,18 +151,12 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c3ed6c0..eb55c4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -45,11 +46,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -315,18 +312,12 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7bc2c29..e2abe88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,11 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -36,19 +34,20 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
@@ -217,9 +216,9 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
 			lastCheckpointId++;
-			if (checkpointId == lastCheckpointId) {
+			if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
 				if (lastCheckpointId == NUM_CALLS) {
 					triggerLatch.trigger();
 				}
@@ -234,8 +233,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 4b6cac7..5b18c75 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -122,6 +123,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 */
 	public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
 		this.topics = checkNotNull(topics);
+		checkArgument(topics.size() > 0, "You have to define at least one topic.");
 		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 76a69bc..6d2dc70 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -36,6 +36,7 @@ import org.mockito.Matchers;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -159,7 +160,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		assertFalse(listState.get().iterator().hasNext());
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSnapshotState() throws Exception {
@@ -177,9 +178,9 @@ public class FlinkKafkaConsumerBaseTest {
 
 		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
 		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-			
+
 		final LinkedMap pendingCheckpoints = new LinkedMap();
-	
+
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true);
 		assertEquals(0, pendingCheckpoints.size());
 
@@ -222,7 +223,7 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(state2, snapshot2);
 		assertEquals(2, pendingCheckpoints.size());
 		assertEquals(state2, pendingCheckpoints.get(140L));
-		
+
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
 		assertEquals(1, pendingCheckpoints.size());
@@ -241,7 +242,7 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(state3, snapshot3);
 		assertEquals(2, pendingCheckpoints.size());
 		assertEquals(state3, pendingCheckpoints.get(141L));
-		
+
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
 		assertEquals(0, pendingCheckpoints.size());
@@ -302,7 +303,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		@SuppressWarnings("unchecked")
 		public DummyFlinkKafkaConsumer() {
-			super(Arrays.asList("abc", "def"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index d60c999..7f01129 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,20 +17,20 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayDeque;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
@@ -219,9 +219,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			releaseBlocks();
 
 			if (toNotifyOnCheckpoint != null) {
-				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-						receivedBarrier.getId(), receivedBarrier.getTimestamp(),
-						bufferSpiller.getBytesWritten(), latestAlignmentDurationNanos);
+				CheckpointMetaData checkpointMetaData =
+						new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+				checkpointMetaData.
+						setBytesBufferedInAlignment(bufferSpiller.getBytesWritten()).
+						setAlignmentDurationNanos(latestAlignmentDurationNanos);
+
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 1db5845..86945a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,10 +19,11 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.util.ArrayDeque;
 
@@ -115,8 +116,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
 			if (toNotifyOnCheckpoint != null) {
-				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-						receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
+				CheckpointMetaData checkpointMetaData =
+						new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+
+				checkpointMetaData.
+						setBytesBufferedInAlignment(0L).
+						setAlignmentDurationNanos(0L);
+
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
 			}
 			return;
 		}
@@ -149,8 +156,13 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				
 				// notify the listener
 				if (toNotifyOnCheckpoint != null) {
-					toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-							receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
+					CheckpointMetaData checkpointMetaData =
+							new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+					checkpointMetaData.
+							setBytesBufferedInAlignment(0L).
+							setAlignmentDurationNanos(0L);
+
+					toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
 				}
 			}
 		}


[2/4] flink git commit: [hotfix] Make KeyGroupsStateHandle implement StreamStateHandle

Posted by uc...@apache.org.
[hotfix] Make KeyGroupsStateHandle implement StreamStateHandle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fee14309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fee14309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fee14309

Branch: refs/heads/master
Commit: fee143099a5ac7220622617c79abae60786555e4
Parents: 98710ea
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:15:48 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:09 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  2 +-
 .../savepoint/SavepointV1Serializer.java        |  2 +-
 .../runtime/state/KeyGroupsStateHandle.java     | 20 +++++++++++---------
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  7 +++----
 5 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 126ebd2..7ab35c4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -624,7 +624,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private void restoreKeyGroupsInStateHandle()
 				throws IOException, RocksDBException, ClassNotFoundException {
 			try {
-				currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
 				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
 				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
 				restoreKVStateMetaData();

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 536062a..f120e1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -205,7 +205,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 			for (int keyGroup : stateHandle.keyGroups()) {
 				dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
 			}
-			serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
+			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
 		} else {
 			dos.writeByte(NULL_HANDLE);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index ea12808..1d277b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -28,7 +29,7 @@ import java.io.IOException;
  * consists of a range of key group snapshots. A key group is subset of the available
  * key space. The key groups are identified by their key group indices.
  */
-public class KeyGroupsStateHandle implements StateObject {
+public class KeyGroupsStateHandle implements StreamStateHandle {
 
 	private static final long serialVersionUID = -8070326169926626355L;
 
@@ -104,14 +105,6 @@ public class KeyGroupsStateHandle implements StateObject {
 		return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
 	}
 
-	/**
-	 *
-	 * @return the inner stream state handle to the actual key-group states
-	 */
-	public StreamStateHandle getStateHandle() {
-		return stateHandle;
-	}
-
 	@Override
 	public void discardState() throws Exception {
 		stateHandle.discardState();
@@ -123,6 +116,15 @@ public class KeyGroupsStateHandle implements StateObject {
 	}
 
 	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		return stateHandle.openInputStream();
+	}
+
+	public StreamStateHandle getDelegateStateHandle() {
+		return stateHandle;
+	}
+
+	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a766373..040677b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -259,7 +259,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			try {
 
-				fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream();
+				fsDataInputStream = keyGroupsHandle.openInputStream();
 				cancelStreamRegistry.registerClosable(fsDataInputStream);
 
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5fb0e6f..972f0ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2474,7 +2474,7 @@ public class CheckpointCoordinatorTest {
 
 		assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups);
 
-		try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.getStateHandle().openInputStream()) {
+		try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) {
 			for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) {
 				long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
 				inputStream.seek(offset);
@@ -2483,9 +2483,8 @@ public class CheckpointCoordinatorTest {
 				for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) {
 					if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) {
 						long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
-						try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.
-								getStateHandle().openInputStream()) {
-
+						try (FSDataInputStream actualInputStream =
+								     oneActualKeyGroupStateHandle.openInputStream()) {
 							actualInputStream.seek(actualOffset);
 
 							int actualGroupState = InstantiationUtil.


[3/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData

Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff14249..8ada6d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -46,9 +47,9 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -558,9 +559,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 		try {
-			return performCheckpoint(checkpointId, timestamp, 0L, 0L);
+			checkpointMetaData.
+					setBytesBufferedInAlignment(0L).
+					setAlignmentDurationNanos(0L);
+			return performCheckpoint(checkpointMetaData);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -573,11 +577,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void triggerCheckpointOnBarrier(
-			long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 
 		try {
-			performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+			performCheckpoint(checkpointMetaData);
 		}
 		catch (CancelTaskException e) {
 			throw e;
@@ -587,8 +590,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(
-			long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+	private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+
+		long checkpointId = checkpointMetaData.getCheckpointId();
+		long timestamp = checkpointMetaData.getTimestamp();
 
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 
@@ -674,8 +679,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
 
-				final long endOfSyncPart = System.nanoTime();
-				final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+				final long syncEndNanos = System.nanoTime();
+				final long syncDurationMillis = (syncEndNanos - startOfSyncPart) / 1_000_000;
+
+				checkpointMetaData.setSyncDurationMillis(syncDurationMillis);
 
 				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
 						"checkpoint-" + checkpointId + "-" + timestamp,
@@ -684,11 +691,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						chainedNonPartitionedStateHandles,
 						chainedPartitionedStateHandles,
 						keyGroupsStateHandleFuture,
-						checkpointId,
-						bytesBufferedAlignment,
-						alignmentDurationNanos,
-						syncDurationMillis,
-						endOfSyncPart);
+						checkpointMetaData,
+						syncEndNanos);
 
 				cancelables.registerClosable(asyncCheckpointRunnable);
 				asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
@@ -696,7 +700,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished synchronous part of checkpoint {}." +
 							"Alignment duration: {} ms, snapshot duration {} ms",
-							getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+							getName(), checkpointId, checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, syncDurationMillis);
 				}
 
 				return true;
@@ -914,15 +918,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		private final RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture;
 
-		private final long checkpointId;
-
 		private final String name;
 
-		private final long bytesBufferedInAlignment;
-
-		private final long alignmentDurationNanos;
-
-		private final long syncDurationMillies;
+		private final CheckpointMetaData checkpointMetaData;
 
 		private final long asyncStartNanos;
 
@@ -933,11 +931,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles,
 				ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles,
 				RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
-				long checkpointId,
-				long bytesBufferedInAlignment,
-				long alignmentDurationNanos,
-				long syncDurationMillies,
-				long asyncStartNanos) {
+				CheckpointMetaData checkpointMetaData,
+				long asyncStartNanos
+		) {
 
 			this.name = name;
 			this.owner = owner;
@@ -945,10 +941,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			this.nonPartitionedStateHandles = nonPartitionedStateHandles;
 			this.partitioneableStateHandles = partitioneableStateHandles;
 			this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
-			this.checkpointId = checkpointId;
-			this.bytesBufferedInAlignment = bytesBufferedInAlignment;
-			this.alignmentDurationNanos = alignmentDurationNanos;
-			this.syncDurationMillies = syncDurationMillies;
+			this.checkpointMetaData = checkpointMetaData;
 			this.asyncStartNanos = asyncStartNanos;
 		}
 
@@ -974,26 +967,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				final long asyncEndNanos = System.nanoTime();
 				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
 
+				checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+
 				if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
-					owner.getEnvironment().acknowledgeCheckpoint(
-							checkpointId,
-							syncDurationMillies, asyncDurationMillis,
-							bytesBufferedInAlignment, alignmentDurationNanos);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData);
 				} else {
 					CheckpointStateHandles allStateHandles = new CheckpointStateHandles(
 							nonPartitionedStateHandles,
 							partitioneableStateHandles,
 							keyedStates);
 
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
-							allStateHandles,
-							syncDurationMillies, asyncDurationMillis,
-							bytesBufferedInAlignment, alignmentDurationNanos);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, allStateHandles);
 				}
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", 
-							owner.getName(), checkpointId, asyncDurationMillis);
+							owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
 				}
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 5d68841..2cabd70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -981,19 +982,17 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp,
-				long bytesAligned, long alignmentTimeNanos) throws Exception {
-
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
-			assertTrue(timestamp > 0);
-			assertTrue(bytesAligned >= 0);
-			assertTrue(alignmentTimeNanos >= 0);
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+
+			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
+			assertTrue(checkpointMetaData.getTimestamp() > 0);
+			assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
+			assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
 
 			nextExpectedCheckpointId++;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index f2f9092..b6d0450 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -374,18 +375,16 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp,
-				long bytesAligned, long alignmentTimeNanos) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
-			assertTrue(timestamp > 0);
+			assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointMetaData.getCheckpointId());
+			assertTrue(checkpointMetaData.getTimestamp() > 0);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index f6e7dca..1b2b723 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -374,7 +375,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke(env);
 		testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
 
-		while(!streamTask.triggerCheckpoint(checkpointId, checkpointTimestamp));
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
+
+		while(!streamTask.triggerCheckpoint(checkpointMetaData));
 
 		// since no state was set, there shouldn't be restore calls
 		assertEquals(0, TestingStreamOperator.numberRestoreCalls);
@@ -517,11 +520,10 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		@Override
 		public void acknowledgeCheckpoint(
-				long checkpointId,
-				CheckpointStateHandles checkpointStateHandles,
-				long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+				CheckpointMetaData checkpointMetaData,
+				CheckpointStateHandles checkpointStateHandles) {
 
-			this.checkpointId = checkpointId;
+			this.checkpointId = checkpointMetaData.getCheckpointId();
 			if(checkpointStateHandles != null) {
 				this.state = checkpointStateHandles.getNonPartitionedStateHandles();
 				this.keyGroupStates = checkpointStateHandles.getKeyGroupsStateHandle();

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 1e62e28..10fc400 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -29,11 +30,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -235,7 +234,8 @@ public class SourceStreamTaskTest {
 		public Boolean call() throws Exception {
 			for (int i = 0; i < numCheckpoints; i++) {
 				long currentCheckpointId = checkpointId.getAndIncrement();
-				sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
+				CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
+				sourceTask.triggerCheckpoint(checkpointMetaData);
 				Thread.sleep(checkpointInterval);
 			}
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9b773d8..f852682 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -43,18 +43,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -311,18 +308,12 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
 	}
 
 	@Override