You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/25 13:08:54 UTC

[flink] branch master updated: [FLINK-28585][runtime] Ensure all the concurrent executions of SpeculativeExecutionVertex to share the same input splits

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7611928d0f1 [FLINK-28585][runtime] Ensure all the concurrent executions of SpeculativeExecutionVertex to share the same input splits
7611928d0f1 is described below

commit 7611928d0f1a7bb20ec5b0538e0fbe9102a07023
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jul 21 03:12:57 2022 +0800

    [FLINK-28585][runtime] Ensure all the concurrent executions of SpeculativeExecutionVertex to share the same input splits
    
    This allows speculative execution for sources tasks from jobs with InputFormatSource.
    
    This closes #20322.
---
 .../flink/runtime/executiongraph/Execution.java    |  4 +-
 .../runtime/executiongraph/ExecutionVertex.java    | 16 +++--
 .../executiongraph/SpeculativeExecutionVertex.java | 24 ++++++++
 .../runtime/scheduler/ExecutionGraphHandler.java   |  7 ++-
 .../SpeculativeExecutionVertexTest.java            | 72 +++++++++++++++++++++-
 5 files changed, 109 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 96c0d3c119a..657effb66f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -319,10 +319,10 @@ public class Execution
         }
     }
 
-    public InputSplit getNextInputSplit() {
+    public Optional<InputSplit> getNextInputSplit() {
         final LogicalSlot slot = this.getAssignedResource();
         final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
-        return this.vertex.getNextInputSplit(host);
+        return this.vertex.getNextInputSplit(host, getAttemptNumber());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c6bf63aed12..56dc4e1067e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -238,16 +238,14 @@ public class ExecutionVertex
         return allConsumedPartitions.get(input);
     }
 
-    public InputSplit getNextInputSplit(String host) {
-        final int taskId = getParallelSubtaskIndex();
-        synchronized (inputSplits) {
-            final InputSplit nextInputSplit =
-                    jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
-            if (nextInputSplit != null) {
-                inputSplits.add(nextInputSplit);
-            }
-            return nextInputSplit;
+    public Optional<InputSplit> getNextInputSplit(String host, int attemptNumber) {
+        final int subtaskIndex = getParallelSubtaskIndex();
+        final InputSplit nextInputSplit =
+                jobVertex.getSplitAssigner().getNextInputSplit(host, subtaskIndex);
+        if (nextInputSplit != null) {
+            inputSplits.add(nextInputSplit);
         }
+        return Optional.ofNullable(nextInputSplit);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 93a82f91ed5..41ef1c0ac2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -27,6 +28,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +48,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
 
     private int originalAttemptNumber;
 
+    final Map<Integer, Integer> nextInputSplitIndexToConsumeByAttempts;
+
     public SpeculativeExecutionVertex(
             ExecutionJobVertex jobVertex,
             int subTaskIndex,
@@ -66,6 +70,7 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
         this.currentExecutions = new LinkedHashMap<>();
         this.currentExecutions.put(currentExecution.getAttemptNumber(), currentExecution);
         this.originalAttemptNumber = currentExecution.getAttemptNumber();
+        this.nextInputSplitIndexToConsumeByAttempts = new HashMap<>();
     }
 
     public boolean containsSources() {
@@ -155,6 +160,7 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
         currentExecutions.clear();
         currentExecutions.put(currentExecution.getAttemptNumber(), currentExecution);
         originalAttemptNumber = currentExecution.getAttemptNumber();
+        nextInputSplitIndexToConsumeByAttempts.clear();
     }
 
     @Override
@@ -180,6 +186,7 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
 
         final Execution removedExecution =
                 this.currentExecutions.remove(executionAttemptId.getAttemptNumber());
+        nextInputSplitIndexToConsumeByAttempts.remove(executionAttemptId.getAttemptNumber());
         checkNotNull(
                 removedExecution,
                 "Cannot remove execution %s which does not exist.",
@@ -238,6 +245,23 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
         }
     }
 
+    @Override
+    public Optional<InputSplit> getNextInputSplit(String host, int attemptNumber) {
+        final int index = nextInputSplitIndexToConsumeByAttempts.getOrDefault(attemptNumber, 0);
+        checkState(index <= inputSplits.size());
+
+        if (index < inputSplits.size()) {
+            nextInputSplitIndexToConsumeByAttempts.put(attemptNumber, index + 1);
+            return Optional.of(inputSplits.get(index));
+        } else {
+            final Optional<InputSplit> split = super.getNextInputSplit(host, attemptNumber);
+            if (split.isPresent()) {
+                nextInputSplitIndexToConsumeByAttempts.put(attemptNumber, index + 1);
+            }
+            return split;
+        }
+    }
+
     @Override
     void notifyPendingDeployment(Execution execution) {
         getExecutionGraphAccessor()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
index 8c008f64b9b..bdfe167979f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
@@ -202,11 +202,14 @@ public class ExecutionGraphHandler {
             throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
         }
 
-        final InputSplit nextInputSplit = execution.getNextInputSplit();
+        final Optional<InputSplit> optionalNextInputSplit = execution.getNextInputSplit();
 
-        if (nextInputSplit != null) {
+        final InputSplit nextInputSplit;
+        if (optionalNextInputSplit.isPresent()) {
+            nextInputSplit = optionalNextInputSplit.get();
             log.debug("Send next input split {}.", nextInputSplit);
         } else {
+            nextInputSplit = null;
             log.debug("No more input splits available");
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
index 944b32278b6..749522fbe25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -33,8 +36,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -229,8 +237,47 @@ class SpeculativeExecutionVertexTest {
         }
     }
 
-    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+    @Test
+    void testGetNextInputSplit() throws Exception {
+        final TestInputSource source = new TestInputSource();
         final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        jobVertex.setInputSplitSource(source);
+
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(jobVertex);
+
+        final int numExecutions = 3;
+        for (int i = 0; i < numExecutions - 1; ++i) {
+            ev.createNewSpeculativeExecution(0);
+        }
+        final List<Execution> executions = new ArrayList<>(ev.getCurrentExecutions());
+
+        final Map<Integer, List<InputSplit>> splitsOfAttempts = new HashMap<>();
+        final Random rand = new Random();
+        while (executions.size() > 0) {
+            final int index = rand.nextInt(executions.size());
+            final Execution execution = executions.get(index);
+            final Optional<InputSplit> split = execution.getNextInputSplit();
+            if (split.isPresent()) {
+                splitsOfAttempts
+                        .computeIfAbsent(execution.getAttemptNumber(), k -> new ArrayList<>())
+                        .add(split.get());
+            } else {
+                executions.remove(index);
+            }
+        }
+
+        assertThat(splitsOfAttempts).hasSize(numExecutions);
+        assertThat(splitsOfAttempts.get(0)).containsExactlyInAnyOrder(source.splits);
+        assertThat(splitsOfAttempts.get(1)).isEqualTo(splitsOfAttempts.get(0));
+        assertThat(splitsOfAttempts.get(2)).isEqualTo(splitsOfAttempts.get(0));
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        return createSpeculativeExecutionVertex(ExecutionGraphTestUtils.createNoOpVertex(1));
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex(final JobVertex jobVertex)
+            throws Exception {
         final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
         final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
         return (SpeculativeExecutionVertex)
@@ -249,4 +296,27 @@ class SpeculativeExecutionVertexTest {
 
         return executionGraph;
     }
+
+    private class TestInputSource extends GenericInputFormat<Integer> {
+        private GenericInputSplit[] splits;
+
+        public GenericInputSplit[] createInputSplits(int numSplitsHint) {
+            final int numSplits = numSplitsHint * 10;
+            splits = new GenericInputSplit[numSplits];
+            for (int i = 0; i < numSplits; ++i) {
+                splits[i] = new GenericInputSplit(i, numSplits);
+            }
+            return splits;
+        }
+
+        @Override
+        public boolean reachedEnd() throws IOException {
+            return false;
+        }
+
+        @Override
+        public Integer nextRecord(Integer reuse) throws IOException {
+            return null;
+        }
+    }
 }