You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/22 11:51:29 UTC
[1/4] flink git commit: [FLINK-5877] [docs] Fix Async I/O Scala
snippet
Repository: flink
Updated Branches:
refs/heads/master 45e01cf23 -> fcc1efcb0
[FLINK-5877] [docs] Fix Async I/O Scala snippet
This closes #3383
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d2ffbad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d2ffbad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d2ffbad
Branch: refs/heads/master
Commit: 2d2ffbad9684e879aa92473798701b7cfc0d1277
Parents: 45e01cf
Author: Andrea Sella <an...@radicalbit.io>
Authored: Tue Feb 21 21:18:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 11:28:51 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/asyncio.md | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d2ffbad/docs/dev/stream/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/asyncio.md b/docs/dev/stream/asyncio.md
index dbf2b9c..c4414b4 100644
--- a/docs/dev/stream/asyncio.md
+++ b/docs/dev/stream/asyncio.md
@@ -139,7 +139,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** The context used for the future callbacks */
- implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()))
+ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
@@ -150,8 +150,8 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.onSuccess {
- case result: String => asyncCollector.collect(Collections.singleton((str, result)));
- })
+ case result: String => asyncCollector.collect(Iterable((str, result)));
+ }
}
}
[4/4] flink git commit: [FLINK-5763] [checkpoints] Move
CheckpointMetrics out of CheckpointMetaData
Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cb8cde4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cb8cde4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cb8cde4
Branch: refs/heads/master
Commit: 1cb8cde48e054395d808f6fe985ae60648e0b6b5
Parents: 2edc971
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Feb 15 18:16:44 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 12:14:55 2017 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/CheckpointMetaData.java | 81 +-------------------
.../runtime/checkpoint/CheckpointMetrics.java | 20 ++++-
.../runtime/jobgraph/tasks/StatefulTask.java | 6 +-
.../jobmanager/JobManagerHARecoveryTest.java | 2 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 3 +-
.../streaming/runtime/io/BarrierBuffer.java | 5 +-
.../streaming/runtime/io/BarrierTracker.java | 10 +--
.../streaming/runtime/tasks/OperatorChain.java | 1 -
.../streaming/runtime/tasks/StreamTask.java | 49 ++++++------
.../io/BarrierBufferAlignmentLimitTest.java | 8 +-
.../streaming/runtime/io/BarrierBufferTest.java | 35 +++++----
.../runtime/io/BarrierTrackerTest.java | 3 +-
.../runtime/tasks/BlockingCheckpointsTest.java | 3 +-
13 files changed, 87 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
index 2627b22..9960b44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.util.Preconditions;
-
import java.io.Serializable;
/**
@@ -35,65 +33,9 @@ public class CheckpointMetaData implements Serializable {
/** The timestamp of the checkpoint */
private final long timestamp;
- private final CheckpointMetrics metrics;
-
public CheckpointMetaData(long checkpointId, long timestamp) {
this.checkpointId = checkpointId;
this.timestamp = timestamp;
- this.metrics = new CheckpointMetrics();
- }
-
- public CheckpointMetaData(
- long checkpointId,
- long timestamp,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos) {
- this.checkpointId = checkpointId;
- this.timestamp = timestamp;
- this.metrics = new CheckpointMetrics(
- bytesBufferedInAlignment,
- alignmentDurationNanos,
- synchronousDurationMillis,
- asynchronousDurationMillis);
- }
-
- public CheckpointMetaData(
- long checkpointId,
- long timestamp,
- CheckpointMetrics metrics) {
- this.checkpointId = checkpointId;
- this.timestamp = timestamp;
- this.metrics = Preconditions.checkNotNull(metrics);
- }
-
- public CheckpointMetrics getMetrics() {
- return metrics;
- }
-
- public CheckpointMetaData setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
- Preconditions.checkArgument(bytesBufferedInAlignment >= 0);
- this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment);
- return this;
- }
-
- public CheckpointMetaData setAlignmentDurationNanos(long alignmentDurationNanos) {
- Preconditions.checkArgument(alignmentDurationNanos >= 0);
- this.metrics.setAlignmentDurationNanos(alignmentDurationNanos);
- return this;
- }
-
- public CheckpointMetaData setSyncDurationMillis(long syncDurationMillis) {
- Preconditions.checkArgument(syncDurationMillis >= 0);
- this.metrics.setSyncDurationMillis(syncDurationMillis);
- return this;
- }
-
- public CheckpointMetaData setAsyncDurationMillis(long asyncDurationMillis) {
- Preconditions.checkArgument(asyncDurationMillis >= 0);
- this.metrics.setAsyncDurationMillis(asyncDurationMillis);
- return this;
}
public long getCheckpointId() {
@@ -104,22 +46,6 @@ public class CheckpointMetaData implements Serializable {
return timestamp;
}
- public long getBytesBufferedInAlignment() {
- return metrics.getBytesBufferedInAlignment();
- }
-
- public long getAlignmentDurationNanos() {
- return metrics.getAlignmentDurationNanos();
- }
-
- public long getSyncDurationMillis() {
- return metrics.getSyncDurationMillis();
- }
-
- public long getAsyncDurationMillis() {
- return metrics.getAsyncDurationMillis();
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -132,15 +58,13 @@ public class CheckpointMetaData implements Serializable {
CheckpointMetaData that = (CheckpointMetaData) o;
return (checkpointId == that.checkpointId)
- && (timestamp == that.timestamp)
- && (metrics.equals(that.metrics));
+ && (timestamp == that.timestamp);
}
@Override
public int hashCode() {
int result = (int) (checkpointId ^ (checkpointId >>> 32));
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- result = 31 * result + metrics.hashCode();
return result;
}
@@ -149,7 +73,6 @@ public class CheckpointMetaData implements Serializable {
return "CheckpointMetaData{" +
"checkpointId=" + checkpointId +
", timestamp=" + timestamp +
- ", metrics=" + metrics +
'}';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index be73adb..a90a2e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
import java.io.Serializable;
/**
@@ -49,6 +51,12 @@ public class CheckpointMetrics implements Serializable {
long syncDurationMillis,
long asyncDurationMillis) {
+ // these may be "-1", in case the values are unknown or not set
+ checkArgument(syncDurationMillis >= -1);
+ checkArgument(asyncDurationMillis >= -1);
+ checkArgument(bytesBufferedInAlignment >= -1);
+ checkArgument(alignmentDurationNanos >= -1);
+
this.bytesBufferedInAlignment = bytesBufferedInAlignment;
this.alignmentDurationNanos = alignmentDurationNanos;
this.syncDurationMillis = syncDurationMillis;
@@ -59,32 +67,36 @@ public class CheckpointMetrics implements Serializable {
return bytesBufferedInAlignment;
}
- public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+ public CheckpointMetrics setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+ return this;
}
public long getAlignmentDurationNanos() {
return alignmentDurationNanos;
}
- public void setAlignmentDurationNanos(long alignmentDurationNanos) {
+ public CheckpointMetrics setAlignmentDurationNanos(long alignmentDurationNanos) {
this.alignmentDurationNanos = alignmentDurationNanos;
+ return this;
}
public long getSyncDurationMillis() {
return syncDurationMillis;
}
- public void setSyncDurationMillis(long syncDurationMillis) {
+ public CheckpointMetrics setSyncDurationMillis(long syncDurationMillis) {
this.syncDurationMillis = syncDurationMillis;
+ return this;
}
public long getAsyncDurationMillis() {
return asyncDurationMillis;
}
- public void setAsyncDurationMillis(long asyncDurationMillis) {
+ public CheckpointMetrics setAsyncDurationMillis(long asyncDurationMillis) {
this.asyncDurationMillis = asyncDurationMillis;
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 39ddc961..87b66ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.state.TaskStateHandles;
/**
@@ -41,7 +42,7 @@ public interface StatefulTask {
*
* <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
* i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
- * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData)}
+ * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)}
* method.
*
* @param checkpointMetaData Meta data for about this checkpoint
@@ -55,10 +56,11 @@ public interface StatefulTask {
* barriers on all input streams.
*
* @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointMetrics Metrics about this checkpoint
*
* @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
*/
- void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception;
+ void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception;
/**
* Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index c7c35ec..de54d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -619,7 +619,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("should not be called!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 26b8cdb..187163d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -242,7 +243,7 @@ public class TaskAsyncCallTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("Should not be called");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index e91c26a..611bd44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
@@ -363,11 +364,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;
- checkpointMetaData
+ CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(bytesBuffered)
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9351f1b..77608c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -250,12 +251,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
if (toNotifyOnCheckpoint != null) {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
+ .setBytesBufferedInAlignment(0L)
+ .setAlignmentDurationNanos(0L);
- checkpointMetaData
- .setBytesBufferedInAlignment(0L)
- .setAlignmentDurationNanos(0L);
-
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7e24eea..591ed3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -162,7 +162,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
-
public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
try {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 92fc6e5..60afd60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
@@ -516,10 +517,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
try {
- checkpointMetaData.
- setBytesBufferedInAlignment(0L).
- setAlignmentDurationNanos(0L);
- return performCheckpoint(checkpointMetaData);
+ // No alignment if we inject a checkpoint
+ CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
+ .setBytesBufferedInAlignment(0L)
+ .setAlignmentDurationNanos(0L);
+
+ return performCheckpoint(checkpointMetaData, checkpointMetrics);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
@@ -535,9 +538,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
try {
- performCheckpoint(checkpointMetaData);
+ performCheckpoint(checkpointMetaData, checkpointMetrics);
}
catch (CancelTaskException e) {
throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
@@ -562,7 +565,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName());
synchronized (lock) {
@@ -576,7 +579,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
- checkpointState(checkpointMetaData);
+ checkpointState(checkpointMetaData, checkpointMetrics);
return true;
}
else {
@@ -629,8 +632,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private void checkpointState(CheckpointMetaData checkpointMetaData) throws Exception {
- CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData);
+ private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
checkpointingOperation.executeCheckpointing();
}
@@ -868,6 +871,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private List<StreamStateHandle> nonPartitionedStateHandles;
private final CheckpointMetaData checkpointMetaData;
+ private final CheckpointMetrics checkpointMetrics;
private final long asyncStartNanos;
@@ -879,11 +883,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
List<StreamStateHandle> nonPartitionedStateHandles,
List<OperatorSnapshotResult> snapshotInProgressList,
CheckpointMetaData checkpointMetaData,
+ CheckpointMetrics checkpointMetrics,
long asyncStartNanos) {
this.owner = Preconditions.checkNotNull(owner);
this.snapshotInProgressList = Preconditions.checkNotNull(snapshotInProgressList);
this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+ this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
this.nonPartitionedStateHandles = nonPartitionedStateHandles;
this.asyncStartNanos = asyncStartNanos;
@@ -900,9 +906,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public void run() {
-
try {
-
// Keyed state handle future, currently only one (the head) operator can have this
KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
@@ -925,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
- checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+ checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState =
new ChainedStateHandle<>(nonPartitionedStateHandles);
@@ -946,7 +950,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
owner.getEnvironment().acknowledgeCheckpoint(
checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getMetrics(),
+ checkpointMetrics,
subtaskState);
if (LOG.isDebugEnabled()) {
@@ -1039,6 +1043,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final StreamTask<?, ?> owner;
private final CheckpointMetaData checkpointMetaData;
+ private final CheckpointMetrics checkpointMetrics;
private final StreamOperator<?>[] allOperators;
@@ -1050,21 +1055,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final List<StreamStateHandle> nonPartitionedStates;
private final List<OperatorSnapshotResult> snapshotInProgressList;
- public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData) {
+ public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
this.owner = Preconditions.checkNotNull(owner);
this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+ this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
this.allOperators = owner.operatorChain.getAllOperators();
this.nonPartitionedStates = new ArrayList<>(allOperators.length);
this.snapshotInProgressList = new ArrayList<>(allOperators.length);
}
public void executeCheckpointing() throws Exception {
-
startSyncPartNano = System.nanoTime();
boolean failed = true;
try {
-
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
@@ -1076,7 +1080,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
startAsyncPartNano = System.nanoTime();
- checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
+ checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
runAsyncCheckpointingAndAcknowledge();
@@ -1086,8 +1090,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
LOG.debug("{} - finished synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
- checkpointMetaData.getSyncDurationMillis());
+ checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+ checkpointMetrics.getSyncDurationMillis());
}
} finally {
if (failed) {
@@ -1118,8 +1122,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
- checkpointMetaData.getSyncDurationMillis());
+ checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+ checkpointMetrics.getSyncDurationMillis());
}
}
}
@@ -1152,6 +1156,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
nonPartitionedStates,
snapshotInProgressList,
checkpointMetaData,
+ checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerClosable(asyncCheckpointRunnable);
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 3e618ef..46f228a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -154,7 +155,8 @@ public class BarrierBufferAlignmentLimitTest {
check(sequence[21], buffer.getNextNonBlocked());
// no call for a completed checkpoint must have happened
- verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+ any(CheckpointMetrics.class));
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -240,7 +242,7 @@ public class BarrierBufferAlignmentLimitTest {
// checkpoint 4 completed - check and validate buffered replay
check(sequence[9], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class));
check(sequence[10], buffer.getNextNonBlocked());
check(sequence[15], buffer.getNextNonBlocked());
@@ -252,7 +254,7 @@ public class BarrierBufferAlignmentLimitTest {
check(sequence[21], buffer.getNextNonBlocked());
// only checkpoint 4 was successfully completed, not checkpoint 3
- verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d17225c..869d1fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -565,7 +566,7 @@ public class BarrierBufferTest {
// checkpoint done - replay buffered
check(sequence[5], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+ verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
check(sequence[6], buffer.getNextNonBlocked());
check(sequence[9], buffer.getNextNonBlocked());
@@ -1007,14 +1008,14 @@ public class BarrierBufferTest {
check(sequence[0], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[6], buffer.getNextNonBlocked());
assertEquals(5L, buffer.getCurrentCheckpointId());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[8], buffer.getNextNonBlocked());
@@ -1077,7 +1078,7 @@ public class BarrierBufferTest {
check(sequence[2], buffer.getNextNonBlocked());
startTs = System.nanoTime();
check(sequence[5], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[6], buffer.getNextNonBlocked());
@@ -1096,7 +1097,7 @@ public class BarrierBufferTest {
check(sequence[16], buffer.getNextNonBlocked());
startTs = System.nanoTime();
check(sequence[20], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[21], buffer.getNextNonBlocked());
@@ -1113,7 +1114,7 @@ public class BarrierBufferTest {
// a simple successful checkpoint
startTs = System.nanoTime();
check(sequence[32], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[33], buffer.getNextNonBlocked());
@@ -1174,7 +1175,7 @@ public class BarrierBufferTest {
// finished first checkpoint
check(sequence[3], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[5], buffer.getNextNonBlocked());
@@ -1197,7 +1198,7 @@ public class BarrierBufferTest {
assertEquals(0L, buffer.getAlignmentDurationNanos());
// no further checkpoint (abort) notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
// all done
@@ -1279,7 +1280,7 @@ public class BarrierBufferTest {
// checkpoint done
check(sequence[7], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+ verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
// queued data
check(sequence[10], buffer.getNextNonBlocked());
@@ -1298,7 +1299,7 @@ public class BarrierBufferTest {
checkNoTempFilesRemain();
// check overall notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
@@ -1363,7 +1364,7 @@ public class BarrierBufferTest {
// checkpoint finished
check(sequence[7], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
check(sequence[11], buffer.getNextNonBlocked());
// remaining data
@@ -1379,7 +1380,7 @@ public class BarrierBufferTest {
checkNoTempFilesRemain();
// check overall notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
@@ -1491,17 +1492,17 @@ public class BarrierBufferTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
assertTrue("wrong checkpoint id",
nextExpectedCheckpointId == -1L ||
nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
assertTrue(checkpointMetaData.getTimestamp() > 0);
- assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
- assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
+ assertTrue(checkpointMetrics.getBytesBufferedInAlignment() >= 0);
+ assertTrue(checkpointMetrics.getAlignmentDurationNanos() >= 0);
nextExpectedCheckpointId++;
- lastReportedBytesBufferedInAlignment = checkpointMetaData.getBytesBufferedInAlignment();
+ lastReportedBytesBufferedInAlignment = checkpointMetrics.getBytesBufferedInAlignment();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 0d9e6ac..da322f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -506,7 +507,7 @@ public class BarrierTrackerTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
final long expectedId = checkpointIDs[i++];
http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 492b470..5c0f0cf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -275,7 +276,7 @@ public class BlockingCheckpointsTest {
@Override
protected void run() throws Exception {
- triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()));
+ triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics());
}
@Override
[3/4] flink git commit: [FLINK-5763] [checkpoints] Add isSavepoint()
to CheckpointProperties
Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties
This closes #3345
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcc1efcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcc1efcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcc1efcb
Branch: refs/heads/master
Commit: fcc1efcb05bce13e435946107a842727b1e3ee20
Parents: 1cb8cde
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Feb 16 16:52:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 12:14:55 2017 +0100
----------------------------------------------------------------------
.../CheckpointStatsDetailsHandler.java | 3 +-
.../checkpoints/CheckpointStatsHandler.java | 5 ++-
.../CheckpointStatsDetailsHandlerTest.java | 6 ++--
.../checkpoints/CheckpointStatsHandlerTest.java | 8 ++---
.../checkpoint/CheckpointProperties.java | 32 ++++++++++----------
.../checkpoint/CheckpointStatsHistory.java | 2 +-
.../checkpoint/CheckpointPropertiesTest.java | 8 ++---
.../checkpoint/CheckpointStatsHistoryTest.java | 3 ++
8 files changed, 34 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 33d6cf7..d461f03 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -80,7 +79,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
gen.writeNumberField("id", checkpoint.getCheckpointId());
gen.writeStringField("status", checkpoint.getStatus().toString());
- gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+ gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
gen.writeNumberField("state_size", checkpoint.getStateSize());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 8aab5fa..404b2c7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -170,7 +169,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
gen.writeObjectFieldStart("restored");
gen.writeNumberField("id", restored.getCheckpointId());
gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
- gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(restored.getProperties()));
+ gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
String externalPath = restored.getExternalPath();
if (externalPath != null) {
@@ -197,7 +196,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
gen.writeStartObject();
gen.writeNumberField("id", checkpoint.getCheckpointId());
gen.writeStringField("status", checkpoint.getStatus().toString());
- gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+ gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
gen.writeNumberField("state_size", checkpoint.getStateSize());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index fb5cfc5..dfad46d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -128,7 +128,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
@@ -172,7 +172,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
@@ -218,7 +218,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 23a1900..29f3ae9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -241,7 +241,7 @@ public class CheckpointStatsHandlerTest {
JsonNode latestRestoredNode = latestNode.get("restored");
assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
- assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), latestRestoredNode.get("is_savepoint").asBoolean());
+ assertEquals(latestRestored.getProperties().isSavepoint(), latestRestoredNode.get("is_savepoint").asBoolean());
assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText());
JsonNode historyNode = rootNode.get("history");
@@ -252,7 +252,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), inProgressNode.get("is_savepoint").asBoolean());
+ assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong());
assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong());
assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong());
@@ -266,7 +266,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()), completedSavepointNode.get("is_savepoint").asBoolean());
+ assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong());
assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong());
assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong());
@@ -283,7 +283,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), failedNode.get("is_savepoint").asBoolean());
+ assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong());
assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong());
assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 4d8bab2..45c8a1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -86,7 +86,7 @@ public class CheckpointProperties implements Serializable {
* @see CheckpointCoordinator
* @see PendingCheckpoint
*/
- public boolean forceCheckpoint() {
+ boolean forceCheckpoint() {
return forced;
}
@@ -98,7 +98,7 @@ public class CheckpointProperties implements Serializable {
*
* @see PendingCheckpoint
*/
- public boolean externalizeCheckpoint() {
+ boolean externalizeCheckpoint() {
return externalize;
}
@@ -117,7 +117,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnSubsumed() {
+ boolean discardOnSubsumed() {
return discardSubsumed;
}
@@ -131,7 +131,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobFinished() {
+ boolean discardOnJobFinished() {
return discardFinished;
}
@@ -145,7 +145,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobCancelled() {
+ boolean discardOnJobCancelled() {
return discardCancelled;
}
@@ -159,7 +159,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobFailed() {
+ boolean discardOnJobFailed() {
return discardFailed;
}
@@ -173,10 +173,19 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobSuspended() {
+ boolean discardOnJobSuspended() {
return discardSuspended;
}
+ /**
+ * Returns whether the checkpoint properties describe a standard savepoint.
+ *
+ * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
+ */
+ public boolean isSavepoint() {
+ return this == STANDARD_SAVEPOINT;
+ }
+
// ------------------------------------------------------------------------
@Override
@@ -306,13 +315,4 @@ public class CheckpointProperties implements Serializable {
}
}
- /**
- * Returns whether the checkpoint properties describe a standard savepoint.
- *
- * @param props Checkpoint properties to check.
- * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
- */
- public static boolean isSavepoint(CheckpointProperties props) {
- return STANDARD_SAVEPOINT.equals(props);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 13ce642..ce14c2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -255,7 +255,7 @@ public class CheckpointStatsHistory implements Serializable {
// Update the latest checkpoint stats
if (completedOrFailed.getStatus().isCompleted()) {
CompletedCheckpointStats completed = (CompletedCheckpointStats) completedOrFailed;
- if (CheckpointProperties.isSavepoint(completed.getProperties()) &&
+ if (completed.getProperties().isSavepoint() &&
(latestSavepoint == null ||
completed.getCheckpointId() > latestSavepoint.getCheckpointId())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index fb3bd65..52ac54c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -93,22 +93,22 @@ public class CheckpointPropertiesTest {
public void testIsSavepoint() throws Exception {
{
CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
- assertFalse(CheckpointProperties.isSavepoint(props));
+ assertFalse(props.isSavepoint());
}
{
CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
- assertFalse(CheckpointProperties.isSavepoint(props));
+ assertFalse(props.isSavepoint());
}
{
CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
- assertFalse(CheckpointProperties.isSavepoint(props));
+ assertFalse(props.isSavepoint());
}
{
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
- assertTrue(CheckpointProperties.isSavepoint(props));
+ assertTrue(props.isSavepoint());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 7541806..3c373f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -175,12 +175,14 @@ public class CheckpointStatsHistoryTest {
PendingCheckpointStats pending = mock(PendingCheckpointStats.class);
when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
when(pending.getCheckpointId()).thenReturn(checkpointId);
+ when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
return pending;
}
private CompletedCheckpointStats createCompletedCheckpointStats(long checkpointId) {
CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+ when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(completed.getCheckpointId()).thenReturn(checkpointId);
return completed;
}
@@ -188,6 +190,7 @@ public class CheckpointStatsHistoryTest {
private FailedCheckpointStats createFailedCheckpointStats(long checkpointId) {
FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+ when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(failed.getCheckpointId()).thenReturn(checkpointId);
return failed;
}
[2/4] flink git commit: [FLINK-5763] [checkpoints] Acknowledge with
explicit ID and CheckpointMetrics
Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Acknowledge with explicit ID and CheckpointMetrics
Instead of acknowledging checkpoints with the CheckpointMetaData make
the acknowledgement explicit by ID and CheckpointMetrics. The rest is
not needed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2edc9718
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2edc9718
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2edc9718
Branch: refs/heads/master
Commit: 2edc97185700a5bdb3e181a71493d681c0f693e3
Parents: 2d2ffba
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Feb 15 17:52:40 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 12:14:55 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBAsyncSnapshotTest.java | 6 +-
.../checkpoint/CheckpointCoordinator.java | 2 +-
.../CheckpointCoordinatorGateway.java | 3 +-
.../runtime/checkpoint/CheckpointMetrics.java | 2 +-
.../runtime/checkpoint/PendingCheckpoint.java | 27 ++---
.../flink/runtime/execution/Environment.java | 17 +--
.../flink/runtime/jobmaster/JobMaster.java | 7 +-
.../checkpoint/AcknowledgeCheckpoint.java | 35 +++---
.../rpc/RpcCheckpointResponder.java | 9 +-
.../ActorGatewayCheckpointResponder.java | 7 +-
.../taskmanager/CheckpointResponder.java | 12 +-
.../runtime/taskmanager/RuntimeEnvironment.java | 13 +-
.../CheckpointCoordinatorFailureTest.java | 3 +-
.../checkpoint/CheckpointCoordinatorTest.java | 119 ++++++++++---------
.../checkpoint/CheckpointStateRestoreTest.java | 11 +-
.../checkpoint/PendingCheckpointTest.java | 9 +-
.../jobmanager/JobManagerHARecoveryTest.java | 4 +-
.../messages/CheckpointMessagesTest.java | 7 +-
.../operators/testutils/DummyEnvironment.java | 6 +-
.../operators/testutils/MockEnvironment.java | 6 +-
.../runtime/util/JvmExitOnFatalErrorTest.java | 4 +-
.../streaming/runtime/tasks/StreamTask.java | 5 +-
.../operators/async/AsyncWaitOperatorTest.java | 6 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 6 +-
.../runtime/tasks/StreamMockEnvironment.java | 6 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 7 +-
26 files changed, 174 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 6587291..bce8028 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -151,10 +152,11 @@ public class RocksDBAsyncSnapshotTest {
@Override
public void acknowledgeCheckpoint(
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState checkpointStateHandles) {
- super.acknowledgeCheckpoint(checkpointMetaData);
+ super.acknowledgeCheckpoint(checkpointId, checkpointMetrics);
// block on the latch, to verify that triggerCheckpoint returns below,
// even though the async checkpoint would not finish
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6cac006..36649ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -648,7 +648,7 @@ public class CheckpointCoordinator {
if (checkpoint != null && !checkpoint.isDiscarded()) {
- switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetaData())) {
+ switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 8d1423a..43d66ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -27,7 +27,8 @@ public interface CheckpointCoordinatorGateway extends RpcGateway {
void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
- final CheckpointMetaData checkpointMetaData,
+ final long checkpointId,
+ final CheckpointMetrics checkpointMetrics,
final SubtaskState subtaskState);
void declineCheckpoint(
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index f72b00e..be73adb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -129,4 +129,4 @@ public class CheckpointMetrics implements Serializable {
", asyncDurationMillis=" + asyncDurationMillis +
'}';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1531f0f..9f66314 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -18,6 +18,16 @@
package org.apache.flink.runtime.checkpoint;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -35,17 +45,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
@@ -250,13 +249,13 @@ public class PendingCheckpoint {
*
* @param executionAttemptId of the acknowledged task
* @param subtaskState of the acknowledged task
- * @param checkpointMetaData Checkpoint meta data
+ * @param metrics Checkpoint metrics for the stats
* @return TaskAcknowledgeResult of the operation
*/
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
SubtaskState subtaskState,
- CheckpointMetaData checkpointMetaData) {
+ CheckpointMetrics metrics) {
synchronized (lock) {
if (discarded) {
@@ -314,8 +313,6 @@ public class PendingCheckpoint {
++numAcknowledgedTasks;
if (statsCallback != null) {
- CheckpointMetrics metrics = checkpointMetaData.getMetrics();
-
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 1675365..9e9f7c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.execution;
+import java.util.Map;
+import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
@@ -25,7 +27,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -39,9 +41,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import java.util.Map;
-import java.util.concurrent.Future;
-
/**
* The Environment gives the code executed in a task access to the task's properties
* (such as name, parallelism), the configurations, the data stream readers and writers,
@@ -162,19 +161,21 @@ public interface Environment {
* to for the checkpoint with the give checkpoint-ID. This method does not include
* any state in the checkpoint.
*
- * @param checkpointMetaData the meta data for this checkpoint
+ * @param checkpointId ID of this checkpoint
+ * @param checkpointMetrics metrics for this checkpoint
*/
- void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData);
+ void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics);
/**
* Confirms that the invokable has successfully completed all required steps for
* the checkpoint with the give checkpoint-ID. This method does include
* the given state in the checkpoint.
*
- * @param checkpointMetaData the meta data for this checkpoint
+ * @param checkpointId ID of this checkpoint
+ * @param checkpointMetrics metrics for this checkpoint
* @param subtaskState All state handles for the checkpointed state
*/
- void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState);
+ void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState);
/**
* Declines a checkpoint. This tells the checkpoint coordinator that this task will
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a318657..941248f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -30,7 +30,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -519,12 +519,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
- final CheckpointMetaData checkpointInfo,
+ final long checkpointId,
+ final CheckpointMetrics checkpointMetrics,
final SubtaskState checkpointState) throws CheckpointException {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage =
- new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState);
+ new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
if (checkpointCoordinator != null) {
getRpcService().execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 7ec3efa..9721c2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -18,13 +18,12 @@
package org.apache.flink.runtime.messages.checkpoint;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
* {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
@@ -39,32 +38,26 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
private final SubtaskState subtaskState;
- private final CheckpointMetaData checkpointMetaData;
+ private final CheckpointMetrics checkpointMetrics;
// ------------------------------------------------------------------------
public AcknowledgeCheckpoint(
JobID job,
ExecutionAttemptID taskExecutionId,
- CheckpointMetaData checkpointMetaData) {
- this(job, taskExecutionId, checkpointMetaData, null);
- }
-
- public AcknowledgeCheckpoint(
- JobID job,
- ExecutionAttemptID taskExecutionId,
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState subtaskState) {
- super(job, taskExecutionId, checkpointMetaData.getCheckpointId());
+ super(job, taskExecutionId, checkpointId);
this.subtaskState = subtaskState;
- this.checkpointMetaData = checkpointMetaData;
- // these may be "-1", in case the values are unknown or not set
- checkArgument(checkpointMetaData.getSyncDurationMillis() >= -1);
- checkArgument(checkpointMetaData.getAsyncDurationMillis() >= -1);
- checkArgument(checkpointMetaData.getBytesBufferedInAlignment() >= -1);
- checkArgument(checkpointMetaData.getAlignmentDurationNanos() >= -1);
+ this.checkpointMetrics = checkpointMetrics;
+ }
+
+ @VisibleForTesting
+ public AcknowledgeCheckpoint(JobID jobId, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ this(jobId, taskExecutionId, checkpointId, new CheckpointMetrics(), null);
}
// ------------------------------------------------------------------------
@@ -75,8 +68,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
return subtaskState;
}
- public CheckpointMetaData getCheckpointMetaData() {
- return checkpointMetaData;
+ public CheckpointMetrics getCheckpointMetrics() {
+ return checkpointMetrics;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 1ce4350..bf60161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -38,15 +38,16 @@ public class RpcCheckpointResponder implements CheckpointResponder {
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState subtaskState) {
checkpointCoordinatorGateway.acknowledgeCheckpoint(
jobID,
executionAttemptID,
- checkpointMetaData,
+ checkpointId,
+ checkpointMetrics,
subtaskState);
-
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index dafcefe..ad0df71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -42,11 +42,12 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState checkpointStateHandles) {
AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
- jobID, executionAttemptID, checkpointMetaData,
+ jobID, executionAttemptID, checkpointId, checkpointMetrics,
checkpointStateHandles);
actorGateway.tell(message);
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index cdf87d3..cc66a3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -35,16 +35,18 @@ public interface CheckpointResponder {
* Job ID of the running job
* @param executionAttemptID
* Execution attempt ID of the running task
+ * @param checkpointId
+ * Meta data for this checkpoint
+ * @param checkpointMetrics
+ * Metrics of this checkpoint
* @param subtaskState
* State handles for the checkpoint
- * @param checkpointMetaData
- * Meta data for this checkpoint
- *
*/
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState subtaskState);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 7fe94a6..788a590 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -237,19 +237,18 @@ public class RuntimeEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
-
- acknowledgeCheckpoint(checkpointMetaData, null);
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
+ acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
}
@Override
public void acknowledgeCheckpoint(
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState checkpointStateHandles) {
-
checkpointResponder.acknowledgeCheckpoint(
- jobId, executionId, checkpointMetaData,
+ jobId, executionId, checkpointId, checkpointMetrics,
checkpointStateHandles);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d3a440a..d4c3a2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -84,8 +84,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
- final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, triggerTimestamp);
- AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData);
+ AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId);
CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class);
PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6ba557b..c2ada3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -319,14 +319,14 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
@@ -533,22 +533,20 @@ public class CheckpointCoordinatorTest {
verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
}
- CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
-
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the other task.
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
@@ -577,9 +575,8 @@ public class CheckpointCoordinatorTest {
coord.triggerCheckpoint(timestampNew, false);
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -669,7 +666,7 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
// acknowledge one of the three tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
// start the second checkpoint
// trigger the first checkpoint. this should succeed
@@ -695,10 +692,10 @@ public class CheckpointCoordinatorTest {
// we acknowledge the remaining two tasks from the first
// checkpoint and two tasks from the second checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
// now, the first checkpoint should be confirmed
assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -709,7 +706,7 @@ public class CheckpointCoordinatorTest {
verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId1), eq(timestamp1));
// send the last remaining ack for the second checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
// now, the second checkpoint should be confirmed
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -803,7 +800,7 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
// acknowledge one of the three tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
// start the second checkpoint
// trigger the first checkpoint. this should succeed
@@ -829,10 +826,10 @@ public class CheckpointCoordinatorTest {
// checkpoint completely. The second checkpoint should then subsume the first checkpoint
CheckpointMetaData checkpointMetaData2= new CheckpointMetaData(checkpointId2, 0L);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
// now, the second checkpoint should be confirmed, and the first discarded
// actually both pending checkpoints are discarded, and the second has been transformed
@@ -855,7 +852,7 @@ public class CheckpointCoordinatorTest {
verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), eq(timestamp2));
// send the last remaining ack for the first checkpoint. This should not do anything
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L)));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
coord.shutdown(JobStatus.FINISHED);
}
@@ -912,7 +909,7 @@ public class CheckpointCoordinatorTest {
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
assertFalse(checkpoint.isDiscarded());
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(checkpoint.getCheckpointId(), 0L)));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
// wait until the checkpoint must have expired.
// we check every 250 msecs conservatively for 5 seconds
@@ -984,13 +981,13 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
// wrong job id
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
// unknown checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(1L, 0L)));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
// unknown ack vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
coord.shutdown(JobStatus.FINISHED);
}
@@ -1051,12 +1048,12 @@ public class CheckpointCoordinatorTest {
SubtaskState triggerSubtaskState = mock(SubtaskState.class);
// acknowledge the first trigger vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState));
SubtaskState unknownSubtaskState = mock(SubtaskState.class);
// receive an acknowledge message for an unknown vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState));
// we should discard acknowledge messages from an unknown vertex belonging to our job
verify(unknownSubtaskState, times(1)).discardState();
@@ -1064,13 +1061,13 @@ public class CheckpointCoordinatorTest {
SubtaskState differentJobSubtaskState = mock(SubtaskState.class);
// receive an acknowledge message from an unknown job
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState));
// we should not interfere with different jobs
verify(differentJobSubtaskState, never()).discardState();
// duplicate acknowledge message for the trigger vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState));
// duplicate acknowledge messages for a known vertex should not trigger discarding the state
verify(triggerSubtaskState, never()).discardState();
@@ -1086,13 +1083,13 @@ public class CheckpointCoordinatorTest {
SubtaskState ackSubtaskState = mock(SubtaskState.class);
// late acknowledge message from the second ack vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointMetaData, ackSubtaskState));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState));
// check that we also cleaned up this state
verify(ackSubtaskState, times(1)).discardState();
// receive an acknowledge message from an unknown job
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState));
// we should not interfere with different jobs
verify(differentJobSubtaskState, never()).discardState();
@@ -1100,7 +1097,7 @@ public class CheckpointCoordinatorTest {
SubtaskState unknownSubtaskState2 = mock(SubtaskState.class);
// receive an acknowledge message for an unknown vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2));
// we should discard acknowledge messages from an unknown vertex belonging to our job
verify(unknownSubtaskState2, times(1)).discardState();
@@ -1261,8 +1258,7 @@ public class CheckpointCoordinatorTest {
Long firstCallId = triggerCalls.take();
assertEquals(1L, firstCallId.longValue());
- AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(
- jid, attemptID, new CheckpointMetaData(1L, System.currentTimeMillis()));
+ AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(jid, attemptID, 1L);
// tell the coordinator that the checkpoint is done
final long ackTime = System.nanoTime();
@@ -1357,7 +1353,7 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertEquals(1, pending.getNumberOfAcknowledgedTasks());
assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
assertFalse(pending.isDiscarded());
@@ -1365,13 +1361,13 @@ public class CheckpointCoordinatorTest {
assertFalse(savepointFuture.isDone());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertFalse(pending.isDiscarded());
assertFalse(pending.isFullyAcknowledged());
assertFalse(savepointFuture.isDone());
// acknowledge the other task.
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
@@ -1403,8 +1399,8 @@ public class CheckpointCoordinatorTest {
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1481,8 +1477,8 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1499,8 +1495,8 @@ public class CheckpointCoordinatorTest {
assertEquals(3, coord.getNumberOfPendingCheckpoints());
// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1510,8 +1506,8 @@ public class CheckpointCoordinatorTest {
assertTrue(savepointFuture2.isDone());
// Ack first savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1585,7 +1581,7 @@ public class CheckpointCoordinatorTest {
.triggerCheckpoint(anyLong(), anyLong());
// now, once we acknowledge one checkpoint, it should trigger the next one
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
// this should have immediately triggered a new checkpoint
now = System.currentTimeMillis();
@@ -1660,7 +1656,7 @@ public class CheckpointCoordinatorTest {
// now we acknowledge the second checkpoint, which should subsume the first checkpoint
// and allow two more checkpoints to be triggered
// now, once we acknowledge one checkpoint, it should trigger the next one
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(2L, 0L)));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
// after a while, there should be the new checkpoints
final long newTimeout = System.currentTimeMillis() + 60000;
@@ -1792,7 +1788,7 @@ public class CheckpointCoordinatorTest {
// ACK all savepoints
long checkpointId = checkpointIDCounter.getLast();
for (int i = 0; i < numSavepoints; i++, checkpointId--) {
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, new CheckpointMetaData(checkpointId, 0L)));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
}
// After ACKs, all should be completed
@@ -1905,7 +1901,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1919,7 +1916,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2008,7 +2006,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2022,7 +2021,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2127,7 +2127,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2144,7 +2145,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2260,12 +2262,12 @@ public class CheckpointCoordinatorTest {
KeyGroupsStateHandle keyedStateBackend = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
KeyGroupsStateHandle keyedStateRaw = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true);
-
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2287,7 +2289,8 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointMetaData,
+ checkpointId,
+ new CheckpointMetrics(),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 0e20ebc8..18b07eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -118,12 +118,11 @@ public class CheckpointStateRestoreTest {
final long checkpointId = pending.getCheckpointId();
SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
- CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 4358526..3a85c4c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -44,7 +44,6 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -95,7 +94,7 @@ public class PendingCheckpointTest {
CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false);
PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp()));
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertEquals(0, tmp.listFiles().length);
pending.finalizeCheckpoint();
assertEquals(1, tmp.listFiles().length);
@@ -103,7 +102,7 @@ public class PendingCheckpointTest {
// Ephemeral checkpoint
CheckpointProperties ephemeral = new CheckpointProperties(false, false, true, true, true, true, true);
pending = createPendingCheckpoint(ephemeral, null);
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp()));
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertEquals(1, tmp.listFiles().length);
pending.finalizeCheckpoint();
@@ -148,7 +147,7 @@ public class PendingCheckpointTest {
future = pending.getCompletionFuture();
assertFalse(future.isDone());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp()));
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
pending.finalizeCheckpoint();
assertTrue(future.isDone());
@@ -231,7 +230,7 @@ public class PendingCheckpointTest {
PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
pending.setStatsCallback(callback);
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp()));
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class));
pending.finalizeCheckpoint();
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 5f2edac..c7c35ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -611,7 +612,8 @@ public class JobManagerHARecoveryTest {
new SubtaskState(chainedStateHandle, null, null, null, null);
getEnvironment().acknowledgeCheckpoint(
- new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
+ checkpointMetaData.getCheckpointId(),
+ new CheckpointMetrics(0L, 0L, 0L, 0L),
checkpointStateHandles);
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 9aa35e0..db45231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -63,7 +63,7 @@ public class CheckpointMessagesTest {
public void testConfirmTaskCheckpointed() {
try {
AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
- new JobID(), new ExecutionAttemptID(), new CheckpointMetaData(569345L, 0L));
+ new JobID(), new ExecutionAttemptID(), 569345L);
KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
@@ -78,7 +78,8 @@ public class CheckpointMessagesTest {
AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
new JobID(),
new ExecutionAttemptID(),
- new CheckpointMetaData(87658976143L, 0L),
+ 87658976143L,
+ new CheckpointMetrics(),
checkpointStateHandles);
testSerializabilityEqualsHashCode(noState);
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 82d8cc1..851fa96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -152,11 +152,11 @@ public class DummyEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
}
@Override
- public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index bfede5b..49175c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -310,12 +310,12 @@ public class MockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
throw new UnsupportedOperationException();
}
@Override
- public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 10f4303..c78a3d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Future;
@@ -235,7 +235,7 @@ public class JvmExitOnFatalErrorTest {
private static final class NoOpCheckpointResponder implements CheckpointResponder {
@Override
- public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+ public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, long i, CheckpointMetrics c, SubtaskState s) {}
@Override
public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d734dc9..92fc6e5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -944,7 +944,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
keyedStateHandleStream);
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
- owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
+ owner.getEnvironment().acknowledgeCheckpoint(
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getMetrics(),
+ subtaskState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 0255ee6..907f8f1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -607,10 +608,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
@Override
public void acknowledgeCheckpoint(
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState checkpointStateHandles) {
- this.checkpointId = checkpointMetaData.getCheckpointId();
+ this.checkpointId = checkpointId;
this.checkpointStateHandles = checkpointStateHandles;
checkpointLatch.trigger();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 4b08c83..69c2c88 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -679,10 +680,11 @@ public class OneInputStreamTaskTest extends TestLogger {
@Override
public void acknowledgeCheckpoint(
- CheckpointMetaData checkpointMetaData,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
SubtaskState checkpointStateHandles) {
- this.checkpointId = checkpointMetaData.getCheckpointId();
+ this.checkpointId = checkpointId;
this.checkpointStateHandles = checkpointStateHandles;
checkpointLatch.trigger();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 58912ab..ff07fa2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
@@ -327,12 +328,11 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
}
@Override
- public void acknowledgeCheckpoint(
- CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 887ea4f..d33d1b6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -447,7 +448,7 @@ public class StreamTaskTest extends TestLogger {
return null;
}
- }).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class));
+ }).when(mockEnvironment).acknowledgeCheckpoint(anyLong(), any(CheckpointMetrics.class), any(SubtaskState.class));
StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -500,7 +501,7 @@ public class StreamTaskTest extends TestLogger {
ArgumentCaptor<SubtaskState> subtaskStateCaptor = ArgumentCaptor.forClass(SubtaskState.class);
// check that the checkpoint has been completed
- verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointMetaData), subtaskStateCaptor.capture());
+ verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), subtaskStateCaptor.capture());
SubtaskState subtaskState = subtaskStateCaptor.getValue();
@@ -628,7 +629,7 @@ public class StreamTaskTest extends TestLogger {
}
// check that the checkpoint has not been acknowledged
- verify(mockEnvironment, never()).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class));
+ verify(mockEnvironment, never()).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), any(SubtaskState.class));
// check that the state handles have been discarded
verify(managedKeyedStateHandle).discardState();