You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/25 13:08:54 UTC
[flink] branch master updated: [FLINK-28585][runtime] Ensure all the concurrent executions of SpeculativeExecutionVertex to share the same input splits
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7611928d0f1 [FLINK-28585][runtime] Ensure all the concurrent executions of SpeculativeExecutionVertex to share the same input splits
7611928d0f1 is described below
commit 7611928d0f1a7bb20ec5b0538e0fbe9102a07023
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jul 21 03:12:57 2022 +0800
[FLINK-28585][runtime] Ensure all the concurrent executions of SpeculativeExecutionVertex to share the same input splits
This allows speculative execution for sources tasks from jobs with InputFormatSource.
This closes #20322.
---
.../flink/runtime/executiongraph/Execution.java | 4 +-
.../runtime/executiongraph/ExecutionVertex.java | 16 +++--
.../executiongraph/SpeculativeExecutionVertex.java | 24 ++++++++
.../runtime/scheduler/ExecutionGraphHandler.java | 7 ++-
.../SpeculativeExecutionVertexTest.java | 72 +++++++++++++++++++++-
5 files changed, 109 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 96c0d3c119a..657effb66f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -319,10 +319,10 @@ public class Execution
}
}
- public InputSplit getNextInputSplit() {
+ public Optional<InputSplit> getNextInputSplit() {
final LogicalSlot slot = this.getAssignedResource();
final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
- return this.vertex.getNextInputSplit(host);
+ return this.vertex.getNextInputSplit(host, getAttemptNumber());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c6bf63aed12..56dc4e1067e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -238,16 +238,14 @@ public class ExecutionVertex
return allConsumedPartitions.get(input);
}
- public InputSplit getNextInputSplit(String host) {
- final int taskId = getParallelSubtaskIndex();
- synchronized (inputSplits) {
- final InputSplit nextInputSplit =
- jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
- if (nextInputSplit != null) {
- inputSplits.add(nextInputSplit);
- }
- return nextInputSplit;
+ public Optional<InputSplit> getNextInputSplit(String host, int attemptNumber) {
+ final int subtaskIndex = getParallelSubtaskIndex();
+ final InputSplit nextInputSplit =
+ jobVertex.getSplitAssigner().getNextInputSplit(host, subtaskIndex);
+ if (nextInputSplit != null) {
+ inputSplits.add(nextInputSplit);
}
+ return Optional.ofNullable(nextInputSplit);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 93a82f91ed5..41ef1c0ac2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -27,6 +28,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -46,6 +48,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
private int originalAttemptNumber;
+ final Map<Integer, Integer> nextInputSplitIndexToConsumeByAttempts;
+
public SpeculativeExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
@@ -66,6 +70,7 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
this.currentExecutions = new LinkedHashMap<>();
this.currentExecutions.put(currentExecution.getAttemptNumber(), currentExecution);
this.originalAttemptNumber = currentExecution.getAttemptNumber();
+ this.nextInputSplitIndexToConsumeByAttempts = new HashMap<>();
}
public boolean containsSources() {
@@ -155,6 +160,7 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
currentExecutions.clear();
currentExecutions.put(currentExecution.getAttemptNumber(), currentExecution);
originalAttemptNumber = currentExecution.getAttemptNumber();
+ nextInputSplitIndexToConsumeByAttempts.clear();
}
@Override
@@ -180,6 +186,7 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
final Execution removedExecution =
this.currentExecutions.remove(executionAttemptId.getAttemptNumber());
+ nextInputSplitIndexToConsumeByAttempts.remove(executionAttemptId.getAttemptNumber());
checkNotNull(
removedExecution,
"Cannot remove execution %s which does not exist.",
@@ -238,6 +245,23 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
}
}
+ @Override
+ public Optional<InputSplit> getNextInputSplit(String host, int attemptNumber) {
+ final int index = nextInputSplitIndexToConsumeByAttempts.getOrDefault(attemptNumber, 0);
+ checkState(index <= inputSplits.size());
+
+ if (index < inputSplits.size()) {
+ nextInputSplitIndexToConsumeByAttempts.put(attemptNumber, index + 1);
+ return Optional.of(inputSplits.get(index));
+ } else {
+ final Optional<InputSplit> split = super.getNextInputSplit(host, attemptNumber);
+ if (split.isPresent()) {
+ nextInputSplitIndexToConsumeByAttempts.put(attemptNumber, index + 1);
+ }
+ return split;
+ }
+ }
+
@Override
void notifyPendingDeployment(Execution execution) {
getExecutionGraphAccessor()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
index 8c008f64b9b..bdfe167979f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
@@ -202,11 +202,14 @@ public class ExecutionGraphHandler {
throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
}
- final InputSplit nextInputSplit = execution.getNextInputSplit();
+ final Optional<InputSplit> optionalNextInputSplit = execution.getNextInputSplit();
- if (nextInputSplit != null) {
+ final InputSplit nextInputSplit;
+ if (optionalNextInputSplit.isPresent()) {
+ nextInputSplit = optionalNextInputSplit.get();
log.debug("Send next input split {}.", nextInputSplit);
} else {
+ nextInputSplit = null;
log.debug("No more input splits available");
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
index 944b32278b6..749522fbe25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
@@ -19,6 +19,9 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -33,8 +36,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
@@ -229,8 +237,47 @@ class SpeculativeExecutionVertexTest {
}
}
- private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+ @Test
+ void testGetNextInputSplit() throws Exception {
+ final TestInputSource source = new TestInputSource();
final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+ jobVertex.setInputSplitSource(source);
+
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(jobVertex);
+
+ final int numExecutions = 3;
+ for (int i = 0; i < numExecutions - 1; ++i) {
+ ev.createNewSpeculativeExecution(0);
+ }
+ final List<Execution> executions = new ArrayList<>(ev.getCurrentExecutions());
+
+ final Map<Integer, List<InputSplit>> splitsOfAttempts = new HashMap<>();
+ final Random rand = new Random();
+ while (executions.size() > 0) {
+ final int index = rand.nextInt(executions.size());
+ final Execution execution = executions.get(index);
+ final Optional<InputSplit> split = execution.getNextInputSplit();
+ if (split.isPresent()) {
+ splitsOfAttempts
+ .computeIfAbsent(execution.getAttemptNumber(), k -> new ArrayList<>())
+ .add(split.get());
+ } else {
+ executions.remove(index);
+ }
+ }
+
+ assertThat(splitsOfAttempts).hasSize(numExecutions);
+ assertThat(splitsOfAttempts.get(0)).containsExactlyInAnyOrder(source.splits);
+ assertThat(splitsOfAttempts.get(1)).isEqualTo(splitsOfAttempts.get(0));
+ assertThat(splitsOfAttempts.get(2)).isEqualTo(splitsOfAttempts.get(0));
+ }
+
+ private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+ return createSpeculativeExecutionVertex(ExecutionGraphTestUtils.createNoOpVertex(1));
+ }
+
+ private SpeculativeExecutionVertex createSpeculativeExecutionVertex(final JobVertex jobVertex)
+ throws Exception {
final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
return (SpeculativeExecutionVertex)
@@ -249,4 +296,27 @@ class SpeculativeExecutionVertexTest {
return executionGraph;
}
+
+ private class TestInputSource extends GenericInputFormat<Integer> {
+ private GenericInputSplit[] splits;
+
+ public GenericInputSplit[] createInputSplits(int numSplitsHint) {
+ final int numSplits = numSplitsHint * 10;
+ splits = new GenericInputSplit[numSplits];
+ for (int i = 0; i < numSplits; ++i) {
+ splits[i] = new GenericInputSplit(i, numSplits);
+ }
+ return splits;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Integer nextRecord(Integer reuse) throws IOException {
+ return null;
+ }
+ }
}