You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 08:44:30 UTC

[flink] 01/02: [FLINK-12227][runtime] Add interfaces for SchedulingTopology

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c862e5fba73a335995c1667bb036ab80edf579e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Apr 18 17:57:11 2019 +0200

    [FLINK-12227][runtime] Add interfaces for SchedulingTopology
    
    The SchedulingTopology contains the information which is provided to the
    SchedulingStrategy in order to decide which execution vertices should be
    scheduled next.
---
 .../scheduler/strategy/ExecutionVertexID.java      | 76 +++++++++++++++++
 .../strategy/SchedulingExecutionVertex.java        | 58 +++++++++++++
 .../strategy/SchedulingResultPartition.java        | 99 ++++++++++++++++++++++
 .../scheduler/strategy/SchedulingTopology.java     | 53 ++++++++++++
 4 files changed, 286 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
new file mode 100644
index 0000000..6e44e11
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Id identifying {@link ExecutionVertex}.
+ */
+public class ExecutionVertexID {
+	private final JobVertexID jobVertexId;
+
+	private final int subtaskIndex;
+
+	public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) {
+		checkArgument(subtaskIndex >= 0, "subtaskIndex must be greater than or equal to 0");
+
+		this.jobVertexId = checkNotNull(jobVertexId);
+		this.subtaskIndex = subtaskIndex;
+	}
+
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	public int getSubtaskIndex() {
+		return subtaskIndex;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		ExecutionVertexID that = (ExecutionVertexID) o;
+
+		return subtaskIndex == that.subtaskIndex && jobVertexId.equals(that.jobVertexId);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = jobVertexId.hashCode();
+		result = 31 * result + subtaskIndex;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return jobVertexId + "_" + subtaskIndex;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
new file mode 100644
index 0000000..b9b2271
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.Collection;
+
+/**
+ * Scheduling representation of {@link ExecutionVertex}.
+ */
+public interface SchedulingExecutionVertex {
+
+	/**
+	 * Gets id of the execution vertex.
+	 *
+	 * @return id of the execution vertex
+	 */
+	ExecutionVertexID getId();
+
+	/**
+	 * Gets the state of the execution vertex.
+	 *
+	 * @return state of the execution vertex
+	 */
+	ExecutionState getState();
+
+	/**
+	 * Get all consumed result partitions.
+	 *
+	 * @return collection of input partitions
+	 */
+	Collection<SchedulingResultPartition> getConsumedResultPartitions();
+
+	/**
+	 * Get all produced result partitions.
+	 *
+	 * @return collection of output edges
+	 */
+	Collection<SchedulingResultPartition> getProducedResultPartitions();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
new file mode 100644
index 0000000..aefc561
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Representation of {@link IntermediateResultPartition}.
+ */
+public interface SchedulingResultPartition {
+
+	/**
+	 * Gets id of the result partition.
+	 *
+	 * @return id of the result partition
+	 */
+	IntermediateResultPartitionID getId();
+
+	/**
+	 * Gets id of the intermediate result.
+	 *
+	 * @return id of the intermediate result
+	 */
+	IntermediateDataSetID getResultId();
+
+	/**
+	 * Gets the {@link ResultPartitionType}.
+	 *
+	 * @return result partition type
+	 */
+	ResultPartitionType getPartitionType();
+
+	/**
+	 * Gets the {@link ResultPartitionState}.
+	 *
+	 * @return result partition state
+	 */
+	ResultPartitionState getState();
+
+	/**
+	 * Gets the producer of this result partition.
+	 *
+	 * @return producer vertex of this result partition
+	 */
+	SchedulingExecutionVertex getProducer();
+
+	/**
+	 * Gets the consumers of this result partition.
+	 *
+	 * @return Collection of consumer vertices of this result partition
+	 */
+	Collection<SchedulingExecutionVertex> getConsumers();
+
+	/**
+	 * State of the result partition.
+	 */
+	enum ResultPartitionState {
+		/**
+		 * Producer is not yet running.
+		 */
+		EMPTY,
+
+		/**
+		 * Producer is running.
+		 */
+		PRODUCING,
+
+		/**
+		 * Producer has terminated.
+		 */
+		DONE,
+
+		/**
+		 * Partition has been released.
+		 */
+		RELEASED
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
new file mode 100644
index 0000000..eb117d2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Optional;
+
+/**
+ * Topology of {@link SchedulingExecutionVertex}.
+ */
+public interface SchedulingTopology {
+
+	/**
+	 * Returns an iterable over all {@link SchedulingExecutionVertex} in topological
+	 * sorted order.
+	 *
+	 * @return Iterable over all scheduling vertices in topological sorted order
+	 */
+	Iterable<SchedulingExecutionVertex> getVertices();
+
+	/**
+	 * Looks up the {@link SchedulingExecutionVertex} for the given {@link ExecutionVertexID}.
+	 *
+	 * @param executionVertexId identifying the respective scheduling vertex
+	 * @return Optional containing the respective scheduling vertex or none if the vertex does not exist
+	 */
+	Optional<SchedulingExecutionVertex> getVertex(ExecutionVertexID executionVertexId);
+
+	/**
+	 * Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}.
+	 *
+	 * @param intermediateResultPartitionId identifying the respective scheduling result partition
+	 * @return Optional containing the respective scheduling result partition or none if the partition does not exist
+	 */
+	Optional<SchedulingResultPartition> getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
+}