You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/05 01:29:13 UTC

[flink] 02/02: [FLINK-21269][runtime] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33cb2fdfc2aea11a933b717838cd1bbf431af9c7
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue Nov 24 17:47:38 2020 +0800

    [FLINK-21269][runtime] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements
    
    The specified resource requirements will be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.
    
    This closes #14860
---
 .../flink/streaming/api/graph/StreamGraph.java     | 13 ++++
 .../streaming/api/graph/StreamGraphGenerator.java  | 19 ++++++
 .../api/graph/StreamingJobGraphGenerator.java      | 16 ++++-
 .../api/graph/StreamGraphGeneratorTest.java        | 49 +++++++++++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 73 ++++++++++++++++++++++
 5 files changed, 169 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 095543c..687c4a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -70,6 +71,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -119,6 +121,7 @@ public class StreamGraph implements Pipeline {
     private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
     private InternalTimeServiceManager.Provider timerServiceProvider;
     private JobType jobType = JobType.STREAMING;
+    private Map<String, ResourceProfile> slotSharingGroupResources;
 
     public StreamGraph(
             ExecutionConfig executionConfig,
@@ -142,6 +145,7 @@ public class StreamGraph implements Pipeline {
         iterationSourceSinkPairs = new HashSet<>();
         sources = new HashSet<>();
         sinks = new HashSet<>();
+        slotSharingGroupResources = new HashMap<>();
     }
 
     public ExecutionConfig getExecutionConfig() {
@@ -229,6 +233,15 @@ public class StreamGraph implements Pipeline {
         this.globalDataExchangeMode = globalDataExchangeMode;
     }
 
+    public void setSlotSharingGroupResource(
+            Map<String, ResourceProfile> slotSharingGroupResources) {
+        this.slotSharingGroupResources.putAll(slotSharingGroupResources);
+    }
+
+    public Optional<ResourceProfile> getSlotSharingGroupResource(String groupId) {
+        return Optional.ofNullable(slotSharingGroupResources.get(groupId));
+    }
+
     /**
      * Set whether to put all vertices into the same slot sharing group by default.
      *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7192437..a33acc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
@@ -138,6 +139,9 @@ public class StreamGraphGenerator {
 
     private final ReadableConfig configuration;
 
+    // Records the slot sharing groups and their corresponding ResourceProfile
+    private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>();
+
     private StateBackend stateBackend;
 
     private CheckpointStorage checkpointStorage;
@@ -259,6 +263,20 @@ public class StreamGraphGenerator {
         return this;
     }
 
+    /**
+     * Specify fine-grained resource requirements for slot sharing groups.
+     *
+     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
+     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
+     * grouped operators together. In cases grouped operators are deployed into separate slots, the
+     * slot resources will be derived from the specified group requirements.
+     */
+    public StreamGraphGenerator setSlotSharingGroupResource(
+            Map<String, ResourceProfile> slotSharingGroupResources) {
+        this.slotSharingGroupResources.putAll(slotSharingGroupResources);
+        return this;
+    }
+
     public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
         this.savepointRestoreSettings = savepointRestoreSettings;
     }
@@ -291,6 +309,7 @@ public class StreamGraphGenerator {
         graph.setTimeCharacteristic(timeCharacteristic);
         graph.setJobName(jobName);
         graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : JobType.STREAMING);
+        graph.setSlotSharingGroupResource(slotSharingGroupResources);
 
         if (shouldExecuteInBatchMode) {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 50e4c1b..6ab5a4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -966,7 +966,14 @@ public class StreamingJobGraphGenerator {
             } else {
                 effectiveSlotSharingGroup =
                         specifiedSlotSharingGroups.computeIfAbsent(
-                                slotSharingGroupKey, k -> new SlotSharingGroup());
+                                slotSharingGroupKey,
+                                k -> {
+                                    SlotSharingGroup ssg = new SlotSharingGroup();
+                                    streamGraph
+                                            .getSlotSharingGroupResource(k)
+                                            .ifPresent(ssg::setResourceProfile);
+                                    return ssg;
+                                });
             }
 
             vertex.setSlotSharingGroup(effectiveSlotSharingGroup);
@@ -981,6 +988,9 @@ public class StreamingJobGraphGenerator {
     private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups() {
         final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = new HashMap<>();
         final SlotSharingGroup defaultSlotSharingGroup = new SlotSharingGroup();
+        streamGraph
+                .getSlotSharingGroupResource(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                .ifPresent(defaultSlotSharingGroup::setResourceProfile);
 
         final boolean allRegionsInSameSlotSharingGroup =
                 streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
@@ -993,6 +1003,10 @@ public class StreamingJobGraphGenerator {
                 regionSlotSharingGroup = defaultSlotSharingGroup;
             } else {
                 regionSlotSharingGroup = new SlotSharingGroup();
+                streamGraph
+                        .getSlotSharingGroupResource(
+                                StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                        .ifPresent(regionSlotSharingGroup::setResourceProfile);
             }
 
             for (JobVertexID jobVertexID : region.getVertexIDs()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index c459bd8..e80b189 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -66,8 +67,11 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -525,6 +529,51 @@ public class StreamGraphGeneratorTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testSetSlotSharingResource() {
+        final String slotSharingGroup1 = "a";
+        final String slotSharingGroup2 = "b";
+        final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+        final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+        final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30);
+        final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
+        slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+        slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+        slotSharingGroupResource.put(
+                StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3);
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStream<Integer> sourceDataStream =
+                env.fromElements(1, 2, 3).slotSharingGroup(slotSharingGroup1);
+        final DataStream<Integer> mapDataStream1 =
+                sourceDataStream.map(x -> x + 1).slotSharingGroup(slotSharingGroup2);
+        final DataStream<Integer> mapDataStream2 = mapDataStream1.map(x -> x * 2);
+
+        final List<Transformation<?>> transformations = new ArrayList<>();
+        transformations.add(sourceDataStream.getTransformation());
+        transformations.add(mapDataStream1.getTransformation());
+        transformations.add(mapDataStream2.getTransformation());
+
+        // all stream nodes share default group by default
+        final StreamGraph streamGraph =
+                new StreamGraphGenerator(
+                                transformations, env.getConfig(), env.getCheckpointConfig())
+                        .setSlotSharingGroupResource(slotSharingGroupResource)
+                        .generate();
+
+        assertThat(
+                streamGraph.getSlotSharingGroupResource(slotSharingGroup1).get(),
+                equalTo(resourceProfile1));
+        assertThat(
+                streamGraph.getSlotSharingGroupResource(slotSharingGroup2).get(),
+                equalTo(resourceProfile2));
+        assertThat(
+                streamGraph
+                        .getSlotSharingGroupResource(
+                                StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                        .get(),
+                equalTo(resourceProfile3));
+    }
+
     static class OutputTypeConfigurableOperationWithTwoInputs
             extends AbstractStreamOperator<Integer>
             implements TwoInputStreamOperator<Integer, Integer, Integer>,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 43c1426..c15a13b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
@@ -121,6 +122,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /** Tests for {@link StreamingJobGraphGenerator}. */
 @SuppressWarnings("serial")
@@ -1196,6 +1198,77 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
+    public void testSlotSharingResourceConfiguration() {
+        final String slotSharingGroup1 = "slot-a";
+        final String slotSharingGroup2 = "slot-b";
+        final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+        final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+        final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30);
+        final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
+        slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+        slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+        slotSharingGroupResource.put(
+                StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3);
+
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromElements(1, 2, 3)
+                .name(slotSharingGroup1)
+                .slotSharingGroup(slotSharingGroup1)
+                .map(x -> x + 1)
+                .name(slotSharingGroup2)
+                .slotSharingGroup(slotSharingGroup2)
+                .map(x -> x * x)
+                .name(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                .slotSharingGroup(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
+        final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+        int numVertex = 0;
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            numVertex += 1;
+            if (jobVertex.getName().contains(slotSharingGroup1)) {
+                assertEquals(
+                        jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile1);
+            } else if (jobVertex.getName().contains(slotSharingGroup2)) {
+                assertEquals(
+                        jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile2);
+            } else if (jobVertex
+                    .getName()
+                    .contains(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)) {
+                assertEquals(
+                        jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile3);
+            } else {
+                fail();
+            }
+        }
+        assertThat(numVertex, is(3));
+    }
+
+    @Test
+    public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
+        final ResourceProfile resourceProfile = ResourceProfile.fromResources(1, 10);
+        final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
+        slotSharingGroupResource.put(
+                StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile);
+
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromElements(1, 2, 3).map(x -> x + 1);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
+        final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+        int numVertex = 0;
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            numVertex += 1;
+            assertEquals(jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile);
+        }
+        assertThat(numVertex, is(2));
+    }
+
+    @Test
     public void testNamingOfChainedMultipleInputs() {
         String[] sources = new String[] {"source-1", "source-2", "source-3"};
         JobGraph graph = createGraphWithMultipleInputs(true, sources);