You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 08:44:30 UTC
[flink] 01/02: [FLINK-12227][runtime] Add interfaces for
SchedulingTopology
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8c862e5fba73a335995c1667bb036ab80edf579e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Apr 18 17:57:11 2019 +0200
[FLINK-12227][runtime] Add interfaces for SchedulingTopology
The SchedulingTopology contains the information which is provided to the
SchedulingStrategy in order to decide which execution vertices should be
scheduled next.
---
.../scheduler/strategy/ExecutionVertexID.java | 76 +++++++++++++++++
.../strategy/SchedulingExecutionVertex.java | 58 +++++++++++++
.../strategy/SchedulingResultPartition.java | 99 ++++++++++++++++++++++
.../scheduler/strategy/SchedulingTopology.java | 53 ++++++++++++
4 files changed, 286 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
new file mode 100644
index 0000000..6e44e11
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Id identifying {@link ExecutionVertex}.
+ */
+public class ExecutionVertexID {
+ private final JobVertexID jobVertexId;
+
+ private final int subtaskIndex;
+
+ public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) {
+ checkArgument(subtaskIndex >= 0, "subtaskIndex must be greater than or equal to 0");
+
+ this.jobVertexId = checkNotNull(jobVertexId);
+ this.subtaskIndex = subtaskIndex;
+ }
+
+ public JobVertexID getJobVertexId() {
+ return jobVertexId;
+ }
+
+ public int getSubtaskIndex() {
+ return subtaskIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ExecutionVertexID that = (ExecutionVertexID) o;
+
+ return subtaskIndex == that.subtaskIndex && jobVertexId.equals(that.jobVertexId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = jobVertexId.hashCode();
+ result = 31 * result + subtaskIndex;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return jobVertexId + "_" + subtaskIndex;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
new file mode 100644
index 0000000..b9b2271
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.Collection;
+
+/**
+ * Scheduling representation of {@link ExecutionVertex}.
+ */
+public interface SchedulingExecutionVertex {
+
+ /**
+ * Gets id of the execution vertex.
+ *
+ * @return id of the execution vertex
+ */
+ ExecutionVertexID getId();
+
+ /**
+ * Gets the state of the execution vertex.
+ *
+ * @return state of the execution vertex
+ */
+ ExecutionState getState();
+
+ /**
+ * Get all consumed result partitions.
+ *
+ * @return collection of input partitions
+ */
+ Collection<SchedulingResultPartition> getConsumedResultPartitions();
+
+ /**
+ * Get all produced result partitions.
+ *
+ * @return collection of output edges
+ */
+ Collection<SchedulingResultPartition> getProducedResultPartitions();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
new file mode 100644
index 0000000..aefc561
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Representation of {@link IntermediateResultPartition}.
+ */
+public interface SchedulingResultPartition {
+
+ /**
+ * Gets id of the result partition.
+ *
+ * @return id of the result partition
+ */
+ IntermediateResultPartitionID getId();
+
+ /**
+ * Gets id of the intermediate result.
+ *
+ * @return id of the intermediate result
+ */
+ IntermediateDataSetID getResultId();
+
+ /**
+ * Gets the {@link ResultPartitionType}.
+ *
+ * @return result partition type
+ */
+ ResultPartitionType getPartitionType();
+
+ /**
+ * Gets the {@link ResultPartitionState}.
+ *
+ * @return result partition state
+ */
+ ResultPartitionState getState();
+
+ /**
+ * Gets the producer of this result partition.
+ *
+ * @return producer vertex of this result partition
+ */
+ SchedulingExecutionVertex getProducer();
+
+ /**
+ * Gets the consumers of this result partition.
+ *
+ * @return Collection of consumer vertices of this result partition
+ */
+ Collection<SchedulingExecutionVertex> getConsumers();
+
+ /**
+ * State of the result partition.
+ */
+ enum ResultPartitionState {
+ /**
+ * Producer is not yet running.
+ */
+ EMPTY,
+
+ /**
+ * Producer is running.
+ */
+ PRODUCING,
+
+ /**
+ * Producer has terminated.
+ */
+ DONE,
+
+ /**
+ * Partition has been released.
+ */
+ RELEASED
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
new file mode 100644
index 0000000..eb117d2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Optional;
+
+/**
+ * Topology of {@link SchedulingExecutionVertex}.
+ */
+public interface SchedulingTopology {
+
+ /**
+ * Returns an iterable over all {@link SchedulingExecutionVertex} in topological
+ * sorted order.
+ *
+ * @return Iterable over all scheduling vertices in topological sorted order
+ */
+ Iterable<SchedulingExecutionVertex> getVertices();
+
+ /**
+ * Looks up the {@link SchedulingExecutionVertex} for the given {@link ExecutionVertexID}.
+ *
+ * @param executionVertexId identifying the respective scheduling vertex
+ * @return Optional containing the respective scheduling vertex or none if the vertex does not exist
+ */
+ Optional<SchedulingExecutionVertex> getVertex(ExecutionVertexID executionVertexId);
+
+ /**
+ * Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}.
+ *
+ * @param intermediateResultPartitionId identifying the respective scheduling result partition
+ * @return Optional containing the respective scheduling result partition or none if the partition does not exist
+ */
+ Optional<SchedulingResultPartition> getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
+}