You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/10/28 08:22:35 UTC

[flink] 05/08: [FLINK-13904][tests] Support checkpoint consumer of SimpleAckingTaskManagerGateway

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc196737b818261d039d6ecb2c1555c340f0e2c0
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Sun Sep 29 14:50:09 2019 +0800

    [FLINK-13904][tests] Support checkpoint consumer of SimpleAckingTaskManagerGateway
---
 .../executiongraph/utils/SimpleAckingTaskManagerGateway.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 0d07f3d..e09d8be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -55,6 +56,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = (ignore1, ignore2) -> { };
 
+	private Consumer<Tuple6<ExecutionAttemptID, JobID, Long, Long, CheckpointOptions, Boolean>> checkpointConsumer = ignore -> { };
+
 	public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
 		this.submitConsumer = submitConsumer;
 	}
@@ -71,6 +74,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 		this.releasePartitionsConsumer = releasePartitionsConsumer;
 	}
 
+	public void setCheckpointConsumer(Consumer<Tuple6<ExecutionAttemptID, JobID, Long, Long, CheckpointOptions, Boolean>> checkpointConsumer) {
+		this.checkpointConsumer = checkpointConsumer;
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
@@ -123,7 +130,9 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 			long checkpointId,
 			long timestamp,
 			CheckpointOptions checkpointOptions,
-			boolean advanceToEndOfEventTime) {}
+			boolean advanceToEndOfEventTime) {
+		checkpointConsumer.accept(Tuple6.of(executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime));
+	}
 
 	@Override
 	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {