You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/22 11:51:32 UTC

[4/4] flink git commit: [FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData

[FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cb8cde4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cb8cde4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cb8cde4

Branch: refs/heads/master
Commit: 1cb8cde48e054395d808f6fe985ae60648e0b6b5
Parents: 2edc971
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Feb 15 18:16:44 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 12:14:55 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointMetaData.java  | 81 +-------------------
 .../runtime/checkpoint/CheckpointMetrics.java   | 20 ++++-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  6 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  2 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  3 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  5 +-
 .../streaming/runtime/io/BarrierTracker.java    | 10 +--
 .../streaming/runtime/tasks/OperatorChain.java  |  1 -
 .../streaming/runtime/tasks/StreamTask.java     | 49 ++++++------
 .../io/BarrierBufferAlignmentLimitTest.java     |  8 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 35 +++++----
 .../runtime/io/BarrierTrackerTest.java          |  3 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |  3 +-
 13 files changed, 87 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
index 2627b22..9960b44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.util.Preconditions;
-
 import java.io.Serializable;
 
 /**
@@ -35,65 +33,9 @@ public class CheckpointMetaData implements Serializable {
 	/** The timestamp of the checkpoint */
 	private final long timestamp;
 
-	private final CheckpointMetrics metrics;
-
 	public CheckpointMetaData(long checkpointId, long timestamp) {
 		this.checkpointId = checkpointId;
 		this.timestamp = timestamp;
-		this.metrics = new CheckpointMetrics();
-	}
-
-	public CheckpointMetaData(
-			long checkpointId,
-			long timestamp,
-			long synchronousDurationMillis,
-			long asynchronousDurationMillis,
-			long bytesBufferedInAlignment,
-			long alignmentDurationNanos) {
-		this.checkpointId = checkpointId;
-		this.timestamp = timestamp;
-		this.metrics = new CheckpointMetrics(
-				bytesBufferedInAlignment,
-				alignmentDurationNanos,
-				synchronousDurationMillis,
-				asynchronousDurationMillis);
-	}
-
-	public CheckpointMetaData(
-			long checkpointId,
-			long timestamp,
-			CheckpointMetrics metrics) {
-		this.checkpointId = checkpointId;
-		this.timestamp = timestamp;
-		this.metrics = Preconditions.checkNotNull(metrics);
-	}
-
-	public CheckpointMetrics getMetrics() {
-		return metrics;
-	}
-
-	public CheckpointMetaData setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
-		Preconditions.checkArgument(bytesBufferedInAlignment >= 0);
-		this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment);
-		return this;
-	}
-
-	public CheckpointMetaData setAlignmentDurationNanos(long alignmentDurationNanos) {
-		Preconditions.checkArgument(alignmentDurationNanos >= 0);
-		this.metrics.setAlignmentDurationNanos(alignmentDurationNanos);
-		return this;
-	}
-
-	public CheckpointMetaData setSyncDurationMillis(long syncDurationMillis) {
-		Preconditions.checkArgument(syncDurationMillis >= 0);
-		this.metrics.setSyncDurationMillis(syncDurationMillis);
-		return this;
-	}
-
-	public CheckpointMetaData setAsyncDurationMillis(long asyncDurationMillis) {
-		Preconditions.checkArgument(asyncDurationMillis >= 0);
-		this.metrics.setAsyncDurationMillis(asyncDurationMillis);
-		return this;
 	}
 
 	public long getCheckpointId() {
@@ -104,22 +46,6 @@ public class CheckpointMetaData implements Serializable {
 		return timestamp;
 	}
 
-	public long getBytesBufferedInAlignment() {
-		return metrics.getBytesBufferedInAlignment();
-	}
-
-	public long getAlignmentDurationNanos() {
-		return metrics.getAlignmentDurationNanos();
-	}
-
-	public long getSyncDurationMillis() {
-		return metrics.getSyncDurationMillis();
-	}
-
-	public long getAsyncDurationMillis() {
-		return metrics.getAsyncDurationMillis();
-	}
-
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -132,15 +58,13 @@ public class CheckpointMetaData implements Serializable {
 		CheckpointMetaData that = (CheckpointMetaData) o;
 
 		return (checkpointId == that.checkpointId)
-				&& (timestamp == that.timestamp)
-				&& (metrics.equals(that.metrics));
+				&& (timestamp == that.timestamp);
 	}
 
 	@Override
 	public int hashCode() {
 		int result = (int) (checkpointId ^ (checkpointId >>> 32));
 		result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-		result = 31 * result + metrics.hashCode();
 		return result;
 	}
 
@@ -149,7 +73,6 @@ public class CheckpointMetaData implements Serializable {
 		return "CheckpointMetaData{" +
 				"checkpointId=" + checkpointId +
 				", timestamp=" + timestamp +
-				", metrics=" + metrics +
 				'}';
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index be73adb..a90a2e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 import java.io.Serializable;
 
 /**
@@ -49,6 +51,12 @@ public class CheckpointMetrics implements Serializable {
 			long syncDurationMillis,
 			long asyncDurationMillis) {
 
+		// these may be "-1", in case the values are unknown or not set
+		checkArgument(syncDurationMillis >= -1);
+		checkArgument(asyncDurationMillis >= -1);
+		checkArgument(bytesBufferedInAlignment >= -1);
+		checkArgument(alignmentDurationNanos >= -1);
+
 		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
 		this.alignmentDurationNanos = alignmentDurationNanos;
 		this.syncDurationMillis = syncDurationMillis;
@@ -59,32 +67,36 @@ public class CheckpointMetrics implements Serializable {
 		return bytesBufferedInAlignment;
 	}
 
-	public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+	public CheckpointMetrics setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
 		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+		return this;
 	}
 
 	public long getAlignmentDurationNanos() {
 		return alignmentDurationNanos;
 	}
 
-	public void setAlignmentDurationNanos(long alignmentDurationNanos) {
+	public CheckpointMetrics setAlignmentDurationNanos(long alignmentDurationNanos) {
 		this.alignmentDurationNanos = alignmentDurationNanos;
+		return this;
 	}
 
 	public long getSyncDurationMillis() {
 		return syncDurationMillis;
 	}
 
-	public void setSyncDurationMillis(long syncDurationMillis) {
+	public CheckpointMetrics setSyncDurationMillis(long syncDurationMillis) {
 		this.syncDurationMillis = syncDurationMillis;
+		return this;
 	}
 
 	public long getAsyncDurationMillis() {
 		return asyncDurationMillis;
 	}
 
-	public void setAsyncDurationMillis(long asyncDurationMillis) {
+	public CheckpointMetrics setAsyncDurationMillis(long asyncDurationMillis) {
 		this.asyncDurationMillis = asyncDurationMillis;
+		return this;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 39ddc961..87b66ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
 /**
@@ -41,7 +42,7 @@ public interface StatefulTask {
 	 * 
 	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
 	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
-	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData)}
+	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)}
 	 * method.
 	 *
 	 * @param checkpointMetaData Meta data for about this checkpoint
@@ -55,10 +56,11 @@ public interface StatefulTask {
 	 * barriers on all input streams.
 	 * 
 	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointMetrics Metrics about this checkpoint
 	 * 
 	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
 	 */
-	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception;
+	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception;
 
 	/**
 	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index c7c35ec..de54d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -619,7 +619,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 26b8cdb..187163d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -242,7 +243,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index e91c26a..611bd44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
@@ -363,11 +364,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 			long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;
 
-			checkpointMetaData
+			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
 					.setBytesBufferedInAlignment(bytesBuffered)
 					.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9351f1b..77608c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -250,12 +251,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
 			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
+				.setBytesBufferedInAlignment(0L)
+				.setAlignmentDurationNanos(0L);
 
-			checkpointMetaData
-					.setBytesBufferedInAlignment(0L)
-					.setAlignmentDurationNanos(0L);
-
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7e24eea..591ed3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -162,7 +162,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 	}
 
-
 	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
 		try {
 			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 92fc6e5..60afd60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -516,10 +517,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	@Override
 	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 		try {
-			checkpointMetaData.
-					setBytesBufferedInAlignment(0L).
-					setAlignmentDurationNanos(0L);
-			return performCheckpoint(checkpointMetaData);
+			// No alignment if we inject a checkpoint
+			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
+					.setBytesBufferedInAlignment(0L)
+					.setAlignmentDurationNanos(0L);
+
+			return performCheckpoint(checkpointMetaData, checkpointMetrics);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -535,9 +538,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
 		try {
-			performCheckpoint(checkpointMetaData);
+			performCheckpoint(checkpointMetaData, checkpointMetrics);
 		}
 		catch (CancelTaskException e) {
 			throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
@@ -562,7 +565,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+	private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
 		LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName());
 
 		synchronized (lock) {
@@ -576,7 +579,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				operatorChain.broadcastCheckpointBarrier(
 						checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
 
-				checkpointState(checkpointMetaData);
+				checkpointState(checkpointMetaData, checkpointMetrics);
 				return true;
 			}
 			else {
@@ -629,8 +632,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private void checkpointState(CheckpointMetaData checkpointMetaData) throws Exception {
-		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData);
+	private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
 		checkpointingOperation.executeCheckpointing();
 	}
 
@@ -868,6 +871,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private List<StreamStateHandle> nonPartitionedStateHandles;
 
 		private final CheckpointMetaData checkpointMetaData;
+		private final CheckpointMetrics checkpointMetrics;
 
 		private final long asyncStartNanos;
 
@@ -879,11 +883,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				List<StreamStateHandle> nonPartitionedStateHandles,
 				List<OperatorSnapshotResult> snapshotInProgressList,
 				CheckpointMetaData checkpointMetaData,
+				CheckpointMetrics checkpointMetrics,
 				long asyncStartNanos) {
 
 			this.owner = Preconditions.checkNotNull(owner);
 			this.snapshotInProgressList = Preconditions.checkNotNull(snapshotInProgressList);
 			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+			this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
 			this.nonPartitionedStateHandles = nonPartitionedStateHandles;
 			this.asyncStartNanos = asyncStartNanos;
 
@@ -900,9 +906,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		@Override
 		public void run() {
-
 			try {
-
 				// Keyed state handle future, currently only one (the head) operator can have this
 				KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
 				KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
@@ -925,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				final long asyncEndNanos = System.nanoTime();
 				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
 
-				checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+				checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
 
 				ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState =
 						new ChainedStateHandle<>(nonPartitionedStateHandles);
@@ -946,7 +950,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 					owner.getEnvironment().acknowledgeCheckpoint(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getMetrics(),
+						checkpointMetrics,
 						subtaskState);
 
 					if (LOG.isDebugEnabled()) {
@@ -1039,6 +1043,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private final StreamTask<?, ?> owner;
 
 		private final CheckpointMetaData checkpointMetaData;
+		private final CheckpointMetrics checkpointMetrics;
 
 		private final StreamOperator<?>[] allOperators;
 
@@ -1050,21 +1055,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private final List<StreamStateHandle> nonPartitionedStates;
 		private final List<OperatorSnapshotResult> snapshotInProgressList;
 
-		public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData) {
+		public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
 			this.owner = Preconditions.checkNotNull(owner);
 			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+			this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
 			this.allOperators = owner.operatorChain.getAllOperators();
 			this.nonPartitionedStates = new ArrayList<>(allOperators.length);
 			this.snapshotInProgressList = new ArrayList<>(allOperators.length);
 		}
 
 		public void executeCheckpointing() throws Exception {
-
 			startSyncPartNano = System.nanoTime();
 
 			boolean failed = true;
 			try {
-
 				for (StreamOperator<?> op : allOperators) {
 					checkpointStreamOperator(op);
 				}
@@ -1076,7 +1080,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				startAsyncPartNano = System.nanoTime();
 
-				checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
+				checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
 
 				// at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
 				runAsyncCheckpointingAndAcknowledge();
@@ -1086,8 +1090,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					LOG.debug("{} - finished synchronous part of checkpoint {}." +
 							"Alignment duration: {} ms, snapshot duration {} ms",
 						owner.getName(), checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-						checkpointMetaData.getSyncDurationMillis());
+						checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+						checkpointMetrics.getSyncDurationMillis());
 				}
 			} finally {
 				if (failed) {
@@ -1118,8 +1122,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
 								"Alignment duration: {} ms, snapshot duration {} ms",
 							owner.getName(), checkpointMetaData.getCheckpointId(),
-							checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-							checkpointMetaData.getSyncDurationMillis());
+							checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+							checkpointMetrics.getSyncDurationMillis());
 					}
 				}
 			}
@@ -1152,6 +1156,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					nonPartitionedStates,
 					snapshotInProgressList,
 					checkpointMetaData,
+					checkpointMetrics,
 					startAsyncPartNano);
 
 			owner.cancelables.registerClosable(asyncCheckpointRunnable);

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 3e618ef..46f228a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -154,7 +155,8 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[21], buffer.getNextNonBlocked());
 
 		// no call for a completed checkpoint must have happened
-		verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+			any(CheckpointMetrics.class));
 
 		assertNull(buffer.getNextNonBlocked());
 		assertNull(buffer.getNextNonBlocked());
@@ -240,7 +242,7 @@ public class BarrierBufferAlignmentLimitTest {
 		// checkpoint 4 completed - check and validate buffered replay
 		check(sequence[9], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class));
 
 		check(sequence[10], buffer.getNextNonBlocked());
 		check(sequence[15], buffer.getNextNonBlocked());
@@ -252,7 +254,7 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[21], buffer.getNextNonBlocked());
 
 		// only checkpoint 4 was successfully completed, not checkpoint 3
-		verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
 
 		assertNull(buffer.getNextNonBlocked());
 		assertNull(buffer.getNextNonBlocked());

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d17225c..869d1fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -565,7 +566,7 @@ public class BarrierBufferTest {
 			// checkpoint done - replay buffered
 			check(sequence[5], buffer.getNextNonBlocked());
 			validateAlignmentTime(startTs, buffer);
-			verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+			verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			check(sequence[9], buffer.getNextNonBlocked());
@@ -1007,14 +1008,14 @@ public class BarrierBufferTest {
 
 		check(sequence[0], buffer.getNextNonBlocked());
 		check(sequence[2], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[6], buffer.getNextNonBlocked());
 		assertEquals(5L, buffer.getCurrentCheckpointId());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[8], buffer.getNextNonBlocked());
@@ -1077,7 +1078,7 @@ public class BarrierBufferTest {
 		check(sequence[2], buffer.getNextNonBlocked());
 		startTs = System.nanoTime();
 		check(sequence[5], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 
 		check(sequence[6], buffer.getNextNonBlocked());
@@ -1096,7 +1097,7 @@ public class BarrierBufferTest {
 		check(sequence[16], buffer.getNextNonBlocked());
 		startTs = System.nanoTime();
 		check(sequence[20], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[21], buffer.getNextNonBlocked());
 
@@ -1113,7 +1114,7 @@ public class BarrierBufferTest {
 		// a simple successful checkpoint
 		startTs = System.nanoTime();
 		check(sequence[32], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[33], buffer.getNextNonBlocked());
 
@@ -1174,7 +1175,7 @@ public class BarrierBufferTest {
 
 		// finished first checkpoint
 		check(sequence[3], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 
 		check(sequence[5], buffer.getNextNonBlocked());
@@ -1197,7 +1198,7 @@ public class BarrierBufferTest {
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// no further checkpoint (abort) notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// all done
@@ -1279,7 +1280,7 @@ public class BarrierBufferTest {
 		// checkpoint done
 		check(sequence[7], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
 
 		// queued data
 		check(sequence[10], buffer.getNextNonBlocked());
@@ -1298,7 +1299,7 @@ public class BarrierBufferTest {
 		checkNoTempFilesRemain();
 
 		// check overall notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
@@ -1363,7 +1364,7 @@ public class BarrierBufferTest {
 		// checkpoint finished
 		check(sequence[7], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
 		check(sequence[11], buffer.getNextNonBlocked());
 
 		// remaining data
@@ -1379,7 +1380,7 @@ public class BarrierBufferTest {
 		checkNoTempFilesRemain();
 
 		// check overall notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
@@ -1491,17 +1492,17 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
 			assertTrue("wrong checkpoint id",
 					nextExpectedCheckpointId == -1L || 
 					nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
 
 			assertTrue(checkpointMetaData.getTimestamp() > 0);
-			assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
-			assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
+			assertTrue(checkpointMetrics.getBytesBufferedInAlignment() >= 0);
+			assertTrue(checkpointMetrics.getAlignmentDurationNanos() >= 0);
 
 			nextExpectedCheckpointId++;
-			lastReportedBytesBufferedInAlignment = checkpointMetaData.getBytesBufferedInAlignment();
+			lastReportedBytesBufferedInAlignment = checkpointMetrics.getBytesBufferedInAlignment();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 0d9e6ac..da322f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -506,7 +507,7 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
 
 			final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 492b470..5c0f0cf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -275,7 +276,7 @@ public class BlockingCheckpointsTest {
 
 		@Override
 		protected void run() throws Exception {
-			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()));
+			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics());
 		}
 
 		@Override