You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/05 01:29:13 UTC
[flink] 02/02: [FLINK-21269][runtime] Introduce runtime interfaces
for specifying SlotSharingGroup-based resource requirements
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33cb2fdfc2aea11a933b717838cd1bbf431af9c7
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue Nov 24 17:47:38 2020 +0800
[FLINK-21269][runtime] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements
The specified resource requirements will be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.
This closes #14860
---
.../flink/streaming/api/graph/StreamGraph.java | 13 ++++
.../streaming/api/graph/StreamGraphGenerator.java | 19 ++++++
.../api/graph/StreamingJobGraphGenerator.java | 16 ++++-
.../api/graph/StreamGraphGeneratorTest.java | 49 +++++++++++++++
.../api/graph/StreamingJobGraphGeneratorTest.java | 73 ++++++++++++++++++++++
5 files changed, 169 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 095543c..687c4a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -70,6 +71,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -119,6 +121,7 @@ public class StreamGraph implements Pipeline {
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
private InternalTimeServiceManager.Provider timerServiceProvider;
private JobType jobType = JobType.STREAMING;
+ private Map<String, ResourceProfile> slotSharingGroupResources;
public StreamGraph(
ExecutionConfig executionConfig,
@@ -142,6 +145,7 @@ public class StreamGraph implements Pipeline {
iterationSourceSinkPairs = new HashSet<>();
sources = new HashSet<>();
sinks = new HashSet<>();
+ slotSharingGroupResources = new HashMap<>();
}
public ExecutionConfig getExecutionConfig() {
@@ -229,6 +233,15 @@ public class StreamGraph implements Pipeline {
this.globalDataExchangeMode = globalDataExchangeMode;
}
+ public void setSlotSharingGroupResource(
+ Map<String, ResourceProfile> slotSharingGroupResources) {
+ this.slotSharingGroupResources.putAll(slotSharingGroupResources);
+ }
+
+ public Optional<ResourceProfile> getSlotSharingGroupResource(String groupId) {
+ return Optional.ofNullable(slotSharingGroupResources.get(groupId));
+ }
+
/**
* Set whether to put all vertices into the same slot sharing group by default.
*
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7192437..a33acc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
@@ -138,6 +139,9 @@ public class StreamGraphGenerator {
private final ReadableConfig configuration;
+ // Records the slot sharing groups and their corresponding ResourceProfile
+ private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>();
+
private StateBackend stateBackend;
private CheckpointStorage checkpointStorage;
@@ -259,6 +263,20 @@ public class StreamGraphGenerator {
return this;
}
+ /**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
+ * grouped operators together. In cases grouped operators are deployed into separate slots, the
+ * slot resources will be derived from the specified group requirements.
+ */
+ public StreamGraphGenerator setSlotSharingGroupResource(
+ Map<String, ResourceProfile> slotSharingGroupResources) {
+ this.slotSharingGroupResources.putAll(slotSharingGroupResources);
+ return this;
+ }
+
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
this.savepointRestoreSettings = savepointRestoreSettings;
}
@@ -291,6 +309,7 @@ public class StreamGraphGenerator {
graph.setTimeCharacteristic(timeCharacteristic);
graph.setJobName(jobName);
graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : JobType.STREAMING);
+ graph.setSlotSharingGroupResource(slotSharingGroupResources);
if (shouldExecuteInBatchMode) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 50e4c1b..6ab5a4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -966,7 +966,14 @@ public class StreamingJobGraphGenerator {
} else {
effectiveSlotSharingGroup =
specifiedSlotSharingGroups.computeIfAbsent(
- slotSharingGroupKey, k -> new SlotSharingGroup());
+ slotSharingGroupKey,
+ k -> {
+ SlotSharingGroup ssg = new SlotSharingGroup();
+ streamGraph
+ .getSlotSharingGroupResource(k)
+ .ifPresent(ssg::setResourceProfile);
+ return ssg;
+ });
}
vertex.setSlotSharingGroup(effectiveSlotSharingGroup);
@@ -981,6 +988,9 @@ public class StreamingJobGraphGenerator {
private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups() {
final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = new HashMap<>();
final SlotSharingGroup defaultSlotSharingGroup = new SlotSharingGroup();
+ streamGraph
+ .getSlotSharingGroupResource(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+ .ifPresent(defaultSlotSharingGroup::setResourceProfile);
final boolean allRegionsInSameSlotSharingGroup =
streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
@@ -993,6 +1003,10 @@ public class StreamingJobGraphGenerator {
regionSlotSharingGroup = defaultSlotSharingGroup;
} else {
regionSlotSharingGroup = new SlotSharingGroup();
+ streamGraph
+ .getSlotSharingGroupResource(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+ .ifPresent(regionSlotSharingGroup::setResourceProfile);
}
for (JobVertexID jobVertexID : region.getVertexIDs()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index c459bd8..e80b189 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -66,8 +67,11 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -525,6 +529,51 @@ public class StreamGraphGeneratorTest extends TestLogger {
}
}
+ @Test
+ public void testSetSlotSharingResource() {
+ final String slotSharingGroup1 = "a";
+ final String slotSharingGroup2 = "b";
+ final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+ final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+ final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30);
+ final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
+ slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+ slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+ slotSharingGroupResource.put(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3);
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ final DataStream<Integer> sourceDataStream =
+ env.fromElements(1, 2, 3).slotSharingGroup(slotSharingGroup1);
+ final DataStream<Integer> mapDataStream1 =
+ sourceDataStream.map(x -> x + 1).slotSharingGroup(slotSharingGroup2);
+ final DataStream<Integer> mapDataStream2 = mapDataStream1.map(x -> x * 2);
+
+ final List<Transformation<?>> transformations = new ArrayList<>();
+ transformations.add(sourceDataStream.getTransformation());
+ transformations.add(mapDataStream1.getTransformation());
+ transformations.add(mapDataStream2.getTransformation());
+
+ // all stream nodes share default group by default
+ final StreamGraph streamGraph =
+ new StreamGraphGenerator(
+ transformations, env.getConfig(), env.getCheckpointConfig())
+ .setSlotSharingGroupResource(slotSharingGroupResource)
+ .generate();
+
+ assertThat(
+ streamGraph.getSlotSharingGroupResource(slotSharingGroup1).get(),
+ equalTo(resourceProfile1));
+ assertThat(
+ streamGraph.getSlotSharingGroupResource(slotSharingGroup2).get(),
+ equalTo(resourceProfile2));
+ assertThat(
+ streamGraph
+ .getSlotSharingGroupResource(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+ .get(),
+ equalTo(resourceProfile3));
+ }
+
static class OutputTypeConfigurableOperationWithTwoInputs
extends AbstractStreamOperator<Integer>
implements TwoInputStreamOperator<Integer, Integer, Integer>,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 43c1426..c15a13b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
@@ -121,6 +122,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/** Tests for {@link StreamingJobGraphGenerator}. */
@SuppressWarnings("serial")
@@ -1196,6 +1198,77 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
+ public void testSlotSharingResourceConfiguration() {
+ final String slotSharingGroup1 = "slot-a";
+ final String slotSharingGroup2 = "slot-b";
+ final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+ final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+ final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30);
+ final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
+ slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+ slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+ slotSharingGroupResource.put(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2, 3)
+ .name(slotSharingGroup1)
+ .slotSharingGroup(slotSharingGroup1)
+ .map(x -> x + 1)
+ .name(slotSharingGroup2)
+ .slotSharingGroup(slotSharingGroup2)
+ .map(x -> x * x)
+ .name(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+ .slotSharingGroup(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP);
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
+ final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ int numVertex = 0;
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ numVertex += 1;
+ if (jobVertex.getName().contains(slotSharingGroup1)) {
+ assertEquals(
+ jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile1);
+ } else if (jobVertex.getName().contains(slotSharingGroup2)) {
+ assertEquals(
+ jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile2);
+ } else if (jobVertex
+ .getName()
+ .contains(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)) {
+ assertEquals(
+ jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile3);
+ } else {
+ fail();
+ }
+ }
+ assertThat(numVertex, is(3));
+ }
+
+ @Test
+ public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
+ final ResourceProfile resourceProfile = ResourceProfile.fromResources(1, 10);
+ final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
+ slotSharingGroupResource.put(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2, 3).map(x -> x + 1);
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
+ final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ int numVertex = 0;
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ numVertex += 1;
+ assertEquals(jobVertex.getSlotSharingGroup().getResourceProfile(), resourceProfile);
+ }
+ assertThat(numVertex, is(2));
+ }
+
+ @Test
public void testNamingOfChainedMultipleInputs() {
String[] sources = new String[] {"source-1", "source-2", "source-3"};
JobGraph graph = createGraphWithMultipleInputs(true, sources);