You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 08:44:31 UTC

[flink] 02/02: [FLINK-12227][runtime] Introduce SchedulingStrategy interface

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a4c3dc54b462c44987b36bb6af6f7a570d829a7
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Mon Apr 22 11:32:39 2019 +0800

    [FLINK-12227][runtime] Introduce SchedulingStrategy interface
    
    This closes #8233.
---
 .../flink/runtime/scheduler/DeploymentOption.java  | 35 ++++++++++++
 .../scheduler/ExecutionVertexDeploymentOption.java | 36 +++++++++++++
 .../runtime/scheduler/SchedulerOperations.java     | 36 +++++++++++++
 .../scheduler/strategy/SchedulingStrategy.java     | 62 ++++++++++++++++++++++
 .../strategy/SchedulingStrategyFactory.java        | 33 ++++++++++++
 5 files changed, 202 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
new file mode 100644
index 0000000..9fb9ace
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+/**
+ * Deployment option which indicates whether the task should send scheduleOrUpdateConsumer message to master.
+ */
+public class DeploymentOption {
+
+	private final boolean sendScheduleOrUpdateConsumerMessage;
+
+	public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) {
+		this.sendScheduleOrUpdateConsumerMessage = sendScheduleOrUpdateConsumerMessage;
+	}
+
+	public boolean sendScheduleOrUpdateConsumerMessage() {
+		return sendScheduleOrUpdateConsumerMessage;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
new file mode 100644
index 0000000..829f6ba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+/**
+ * Component that stores the task need to be scheduled and the option for deployment.
+ */
+public class ExecutionVertexDeploymentOption {
+
+	private final ExecutionVertexID executionVertexId;
+
+	private final DeploymentOption deploymentOption;
+
+	public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) {
+		this.executionVertexId = executionVertexId;
+		this.deploymentOption = deploymentOption;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java
new file mode 100644
index 0000000..50d3f87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
+
+import java.util.Collection;
+
+/**
+ * Component which is used by {@link SchedulingStrategy} to commit scheduling decisions.
+ */
+public interface SchedulerOperations {
+
+	/**
+	 * Allocate slots and deploy the vertex when slots are returned.
+	 *
+	 * @param executionVertexDeploymentOptions The tasks to be deployed and deployment options
+	 */
+	void allocateSlotsAndDeploy(Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
new file mode 100644
index 0000000..4522a35
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.Set;
+
+/**
+ * Component which encapsulates the scheduling logic.
+ * It can react to execution state changes and partition consumable events.
+ * Moreover, it is responsible for resolving task failures.
+ */
+public interface SchedulingStrategy {
+
+	/**
+	 * Called when the scheduling is started (initial scheduling operation).
+	 */
+	void startScheduling();
+
+	/**
+	 * Called whenever vertices need to be restarted (due to task failure).
+	 *
+	 * @param verticesNeedingRestart The tasks need to be restarted
+	 */
+	void restartTasks(Set<ExecutionVertexID> verticesNeedingRestart);
+
+	/**
+	 * Called whenever an {@link Execution} changes its state.
+	 *
+	 * @param executionVertexId The id of the task
+	 * @param executionState The new state of the execution
+	 */
+	void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState);
+
+	/**
+	 * Called whenever an {@link IntermediateResultPartition} becomes consumable.
+	 *
+	 * @param executionVertexId The id of the producer
+	 * @param resultPartitionId The id of the result partition
+	 */
+	void onPartitionConsumable(ExecutionVertexID executionVertexId, ResultPartitionID resultPartitionId);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
new file mode 100644
index 0000000..28b8257
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+/**
+ * Factory interface for {@link SchedulingStrategy}.
+ */
+public interface SchedulingStrategyFactory {
+
+	SchedulingStrategy createInstance(
+			SchedulerOperations schedulerOperations,
+			SchedulingTopology schedulingTopology,
+			JobGraph jobGraph);
+}