You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/05 02:44:03 UTC

[flink] 01/02: [FLINK-18690][runtime] Introduce ExecutionSlotSharingGroup and SlotSharingStrategy interface

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2542d86861cd5dd94feb9fcc92ef8e06398c8241
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Tue Aug 4 11:35:59 2020 +0800

    [FLINK-18690][runtime] Introduce ExecutionSlotSharingGroup and SlotSharingStrategy interface
---
 .../scheduler/CoLocationConstraintDesc.java        | 58 ++++++++++++++++++
 .../jobmanager/scheduler/CoLocationGroupDesc.java  | 69 ++++++++++++++++++++++
 .../scheduler/ExecutionSlotSharingGroup.java       | 45 ++++++++++++++
 .../runtime/scheduler/SlotSharingStrategy.java     | 45 ++++++++++++++
 4 files changed, 217 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
new file mode 100644
index 0000000..461ee3c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A read-only and light weight version of {@link CoLocationConstraint}.
+ */
+public class CoLocationConstraintDesc {
+
+	private final AbstractID coLocationGroupId;
+
+	private final int constraintIndex;
+
+	CoLocationConstraintDesc(final AbstractID coLocationGroupId, final int constraintIndex) {
+		this.coLocationGroupId = checkNotNull(coLocationGroupId);
+		this.constraintIndex = constraintIndex;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj != null && obj.getClass() == getClass()) {
+			CoLocationConstraintDesc that = (CoLocationConstraintDesc) obj;
+			return Objects.equals(that.coLocationGroupId, this.coLocationGroupId) &&
+				that.constraintIndex == this.constraintIndex;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * coLocationGroupId.hashCode() + constraintIndex;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
new file mode 100644
index 0000000..8f3c124
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.AbstractID;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A read-only and light weight version of {@link CoLocationGroup}.
+ */
+public class CoLocationGroupDesc {
+
+	private final AbstractID id;
+
+	private final List<JobVertexID> vertices;
+
+	private CoLocationGroupDesc(final AbstractID id, final List<JobVertexID> vertices) {
+		this.id = checkNotNull(id);
+		this.vertices = checkNotNull(vertices);
+	}
+
+	public AbstractID getId() {
+		return id;
+	}
+
+	public List<JobVertexID> getVertices() {
+		return Collections.unmodifiableList(vertices);
+	}
+
+	public CoLocationConstraintDesc getLocationConstraint(final int index) {
+		return new CoLocationConstraintDesc(id, index);
+	}
+
+	public static CoLocationGroupDesc from(final CoLocationGroup group) {
+		return new CoLocationGroupDesc(
+			group.getId(),
+			group.getVertices().stream().map(JobVertex::getID).collect(Collectors.toList()));
+	}
+
+	@VisibleForTesting
+	public static CoLocationGroupDesc from(final JobVertexID ...ids) {
+		return new CoLocationGroupDesc(new AbstractID(), Arrays.asList(ids));
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
new file mode 100644
index 0000000..45fb652
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Represents execution vertices that will run the same shared slot.
+ */
+class ExecutionSlotSharingGroup {
+
+	private final Set<ExecutionVertexID> executionVertexIds;
+
+	ExecutionSlotSharingGroup() {
+		this.executionVertexIds = new HashSet<>();
+	}
+
+	void addVertex(final ExecutionVertexID executionVertexId) {
+		executionVertexIds.add(executionVertexId);
+	}
+
+	Set<ExecutionVertexID> getExecutionVertexIds() {
+		return Collections.unmodifiableSet(executionVertexIds);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
new file mode 100644
index 0000000..dc1815e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+
+/**
+ * Strategy which determines {@link ExecutionSlotSharingGroup} for each execution vertex.
+ */
+interface SlotSharingStrategy {
+
+	ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
+		ExecutionVertexID executionVertexId);
+
+	Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups();
+
+	@FunctionalInterface
+	interface Factory {
+		SlotSharingStrategy create(
+			SchedulingTopology topology,
+			Set<SlotSharingGroup> logicalSlotSharingGroups,
+			Set<CoLocationGroupDesc> coLocationGroups);
+	}
+}