You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/10/28 08:22:35 UTC
[flink] 05/08: [FLINK-13904][tests] Support checkpoint consumer of
SimpleAckingTaskManagerGateway
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc196737b818261d039d6ecb2c1555c340f0e2c0
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Sun Sep 29 14:50:09 2019 +0800
[FLINK-13904][tests] Support checkpoint consumer of SimpleAckingTaskManagerGateway
---
.../executiongraph/utils/SimpleAckingTaskManagerGateway.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 0d07f3d..e09d8be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -55,6 +56,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = (ignore1, ignore2) -> { };
+ private Consumer<Tuple6<ExecutionAttemptID, JobID, Long, Long, CheckpointOptions, Boolean>> checkpointConsumer = ignore -> { };
+
public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
this.submitConsumer = submitConsumer;
}
@@ -71,6 +74,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
this.releasePartitionsConsumer = releasePartitionsConsumer;
}
+ public void setCheckpointConsumer(Consumer<Tuple6<ExecutionAttemptID, JobID, Long, Long, CheckpointOptions, Boolean>> checkpointConsumer) {
+ this.checkpointConsumer = checkpointConsumer;
+ }
+
@Override
public String getAddress() {
return address;
@@ -123,7 +130,9 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {}
+ boolean advanceToEndOfEventTime) {
+ checkpointConsumer.accept(Tuple6.of(executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime));
+ }
@Override
public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {