You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/05 02:44:03 UTC
[flink] 01/02: [FLINK-18690][runtime] Introduce
ExecutionSlotSharingGroup and SlotSharingStrategy interface
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2542d86861cd5dd94feb9fcc92ef8e06398c8241
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Tue Aug 4 11:35:59 2020 +0800
[FLINK-18690][runtime] Introduce ExecutionSlotSharingGroup and SlotSharingStrategy interface
---
.../scheduler/CoLocationConstraintDesc.java | 58 ++++++++++++++++++
.../jobmanager/scheduler/CoLocationGroupDesc.java | 69 ++++++++++++++++++++++
.../scheduler/ExecutionSlotSharingGroup.java | 45 ++++++++++++++
.../runtime/scheduler/SlotSharingStrategy.java | 45 ++++++++++++++
4 files changed, 217 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
new file mode 100644
index 0000000..461ee3c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A read-only and light weight version of {@link CoLocationConstraint}.
+ */
+public class CoLocationConstraintDesc {
+
+ private final AbstractID coLocationGroupId;
+
+ private final int constraintIndex;
+
+ CoLocationConstraintDesc(final AbstractID coLocationGroupId, final int constraintIndex) {
+ this.coLocationGroupId = checkNotNull(coLocationGroupId);
+ this.constraintIndex = constraintIndex;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj != null && obj.getClass() == getClass()) {
+ CoLocationConstraintDesc that = (CoLocationConstraintDesc) obj;
+ return Objects.equals(that.coLocationGroupId, this.coLocationGroupId) &&
+ that.constraintIndex == this.constraintIndex;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * coLocationGroupId.hashCode() + constraintIndex;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
new file mode 100644
index 0000000..8f3c124
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.AbstractID;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A read-only and light weight version of {@link CoLocationGroup}.
+ */
+public class CoLocationGroupDesc {
+
+ private final AbstractID id;
+
+ private final List<JobVertexID> vertices;
+
+ private CoLocationGroupDesc(final AbstractID id, final List<JobVertexID> vertices) {
+ this.id = checkNotNull(id);
+ this.vertices = checkNotNull(vertices);
+ }
+
+ public AbstractID getId() {
+ return id;
+ }
+
+ public List<JobVertexID> getVertices() {
+ return Collections.unmodifiableList(vertices);
+ }
+
+ public CoLocationConstraintDesc getLocationConstraint(final int index) {
+ return new CoLocationConstraintDesc(id, index);
+ }
+
+ public static CoLocationGroupDesc from(final CoLocationGroup group) {
+ return new CoLocationGroupDesc(
+ group.getId(),
+ group.getVertices().stream().map(JobVertex::getID).collect(Collectors.toList()));
+ }
+
+ @VisibleForTesting
+ public static CoLocationGroupDesc from(final JobVertexID ...ids) {
+ return new CoLocationGroupDesc(new AbstractID(), Arrays.asList(ids));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
new file mode 100644
index 0000000..45fb652
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Represents execution vertices that will run the same shared slot.
+ */
+class ExecutionSlotSharingGroup {
+
+ private final Set<ExecutionVertexID> executionVertexIds;
+
+ ExecutionSlotSharingGroup() {
+ this.executionVertexIds = new HashSet<>();
+ }
+
+ void addVertex(final ExecutionVertexID executionVertexId) {
+ executionVertexIds.add(executionVertexId);
+ }
+
+ Set<ExecutionVertexID> getExecutionVertexIds() {
+ return Collections.unmodifiableSet(executionVertexIds);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
new file mode 100644
index 0000000..dc1815e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+
+/**
+ * Strategy which determines {@link ExecutionSlotSharingGroup} for each execution vertex.
+ */
+interface SlotSharingStrategy {
+
+ ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
+ ExecutionVertexID executionVertexId);
+
+ Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups();
+
+ @FunctionalInterface
+ interface Factory {
+ SlotSharingStrategy create(
+ SchedulingTopology topology,
+ Set<SlotSharingGroup> logicalSlotSharingGroups,
+ Set<CoLocationGroupDesc> coLocationGroups);
+ }
+}