You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 13:05:50 UTC

[flink] branch master updated (9a4c3dc -> f8a6ea5)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9a4c3dc  [FLINK-12227][runtime] Introduce SchedulingStrategy interface
     new f6fc678  [FLINK-12228][runtime] Implement Eager Scheduling Strategy
     new f8a6ea5  [hotfix][tests] Rename variables in EagerSchedulingStrategyTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scheduler/ExecutionVertexDeploymentOption.java |  14 ++-
 .../strategy/EagerSchedulingStrategy.java          | 112 +++++++++++++++++++
 .../scheduler/strategy/SchedulingStrategy.java     |   4 +-
 .../strategy/EagerSchedulingStrategyTest.java      | 121 +++++++++++++++++++++
 .../strategy/TestingSchedulerOperations.java}      |  25 +++--
 .../TestingSchedulingExecutionVertex.java}         |  31 ++++--
 .../strategy/TestingSchedulingTopology.java        |  65 +++++++++++
 7 files changed, 351 insertions(+), 21 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java => test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java} (55%)
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java => test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java} (53%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java


[flink] 01/02: [FLINK-12228][runtime] Implement Eager Scheduling Strategy

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6fc678297a29e16b73ff8d5159214d5afec0ef2
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Fri Apr 26 13:54:06 2019 +0800

    [FLINK-12228][runtime] Implement Eager Scheduling Strategy
    
    This closes #8296.
---
 .../scheduler/ExecutionVertexDeploymentOption.java |  14 ++-
 .../strategy/EagerSchedulingStrategy.java          | 112 +++++++++++++++++++
 .../scheduler/strategy/SchedulingStrategy.java     |   4 +-
 .../strategy/EagerSchedulingStrategyTest.java      | 121 +++++++++++++++++++++
 .../strategy/TestingSchedulerOperations.java       |  44 ++++++++
 .../strategy/TestingSchedulingExecutionVertex.java |  57 ++++++++++
 .../strategy/TestingSchedulingTopology.java        |  65 +++++++++++
 7 files changed, 413 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
index 829f6ba..9831a9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Component that stores the task need to be scheduled and the option for deployment.
  */
@@ -30,7 +32,15 @@ public class ExecutionVertexDeploymentOption {
 	private final DeploymentOption deploymentOption;
 
 	public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) {
-		this.executionVertexId = executionVertexId;
-		this.deploymentOption = deploymentOption;
+		this.executionVertexId = checkNotNull(executionVertexId);
+		this.deploymentOption = checkNotNull(deploymentOption);
+	}
+
+	public ExecutionVertexID getExecutionVertexId() {
+		return executionVertexId;
+	}
+
+	public DeploymentOption getDeploymentOption() {
+		return deploymentOption;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
new file mode 100644
index 0000000..954fff5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+	private final SchedulerOperations schedulerOperations;
+
+	private final SchedulingTopology schedulingTopology;
+
+	private final DeploymentOption deploymentOption = new DeploymentOption(false);
+
+	public EagerSchedulingStrategy(
+			SchedulerOperations schedulerOperations,
+			SchedulingTopology schedulingTopology) {
+		this.schedulerOperations = checkNotNull(schedulerOperations);
+		this.schedulingTopology = checkNotNull(schedulingTopology);
+	}
+
+	@Override
+	public void startScheduling() {
+		final Set<ExecutionVertexID> allVertices = getAllVerticesFromTopology();
+		allocateSlotsAndDeploy(allVertices);
+	}
+
+	@Override
+	public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
+		allocateSlotsAndDeploy(verticesToRestart);
+	}
+
+	@Override
+	public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
+		// Will not react to these notifications.
+	}
+
+	@Override
+	public void onPartitionConsumable(ExecutionVertexID executionVertexId, ResultPartitionID resultPartitionId) {
+		// Will not react to these notifications.
+	}
+
+	private void allocateSlotsAndDeploy(final Set<ExecutionVertexID> verticesToDeploy) {
+		final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions =
+				createExecutionVertexDeploymentOptions(verticesToDeploy);
+		schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+	}
+
+	private Set<ExecutionVertexID> getAllVerticesFromTopology() {
+		return StreamSupport
+				.stream(schedulingTopology.getVertices().spliterator(), false)
+				.map(SchedulingExecutionVertex::getId)
+				.collect(Collectors.toSet());
+	}
+
+	private List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOptions(
+			final Collection<ExecutionVertexID> vertices) {
+		List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions = new ArrayList<>(vertices.size());
+		for (ExecutionVertexID executionVertexID : vertices) {
+			executionVertexDeploymentOptions.add(
+					new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+		}
+		return executionVertexDeploymentOptions;
+	}
+
+	/**
+	 * The factory for creating {@link EagerSchedulingStrategy}.
+	 */
+	public static class Factory implements SchedulingStrategyFactory {
+
+		@Override
+		public SchedulingStrategy createInstance(
+				SchedulerOperations schedulerOperations,
+				SchedulingTopology schedulingTopology,
+				JobGraph jobGraph) {
+			return new EagerSchedulingStrategy(schedulerOperations, schedulingTopology);
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
index 4522a35..5165891 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
@@ -40,9 +40,9 @@ public interface SchedulingStrategy {
 	/**
 	 * Called whenever vertices need to be restarted (due to task failure).
 	 *
-	 * @param verticesNeedingRestart The tasks need to be restarted
+	 * @param verticesToRestart The tasks need to be restarted
 	 */
-	void restartTasks(Set<ExecutionVertexID> verticesNeedingRestart);
+	void restartTasks(Set<ExecutionVertexID> verticesToRestart);
 
 	/**
 	 * Called whenever an {@link Execution} changes its state.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
new file mode 100644
index 0000000..881159d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link EagerSchedulingStrategy}.
+ */
+public class EagerSchedulingStrategyTest extends TestLogger {
+
+	private TestingSchedulerOperations testingSchedulerOperations;
+
+	private TestingSchedulingTopology testingSchedulingTopology;
+
+	private EagerSchedulingStrategy schedulingStrategy;
+
+	@Before
+	public void setUp() {
+		testingSchedulerOperations = new TestingSchedulerOperations();
+		testingSchedulingTopology = new TestingSchedulingTopology();
+		schedulingStrategy = new EagerSchedulingStrategy(
+				testingSchedulerOperations,
+				testingSchedulingTopology);
+	}
+
+	/**
+	 * Tests that when start scheduling eager scheduling strategy will start all vertices in scheduling topology.
+	 */
+	@Test
+	public void testStartScheduling() {
+		JobVertexID jobVertexID = new JobVertexID();
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 0));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 1));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 2));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
+
+		schedulingStrategy.startScheduling();
+
+		assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(1));
+
+		Collection<ExecutionVertexDeploymentOption> scheduledVertices = testingSchedulerOperations.getScheduledVertices().get(0);
+		assertThat(scheduledVertices, hasSize(5));
+		Collection<ExecutionVertexID> vertices = getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+		for (SchedulingExecutionVertex schedulingExecutionVertex : testingSchedulingTopology.getVertices()) {
+			assertThat(vertices, hasItem(schedulingExecutionVertex.getId()));
+		}
+	}
+
+	/**
+	 * Tests that eager scheduling strategy will restart all vertices needing restarted at same time.
+	 */
+	@Test
+	public void testRestartTasks() {
+		JobVertexID jobVertexID = new JobVertexID();
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 0));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 1));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 2));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
+		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
+
+		Set<ExecutionVertexID> toBeRestartedVertices1 = new HashSet<>(Arrays.asList(
+				new ExecutionVertexID(jobVertexID, 0),
+				new ExecutionVertexID(jobVertexID, 4)));
+		schedulingStrategy.restartTasks(toBeRestartedVertices1);
+
+		Set<ExecutionVertexID> toBeRestartedVertices2 = new HashSet<>(Arrays.asList(
+				new ExecutionVertexID(jobVertexID, 1),
+				new ExecutionVertexID(jobVertexID, 2),
+				new ExecutionVertexID(jobVertexID, 3)));
+		schedulingStrategy.restartTasks(toBeRestartedVertices2);
+
+		assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(2));
+
+		Collection<ExecutionVertexDeploymentOption> scheduledVertices1 = testingSchedulerOperations.getScheduledVertices().get(0);
+		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(toBeRestartedVertices1.toArray()));
+
+		Collection<ExecutionVertexDeploymentOption> scheduledVertices2 = testingSchedulerOperations.getScheduledVertices().get(1);
+		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(toBeRestartedVertices2.toArray()));
+	}
+
+	private static Collection<ExecutionVertexID> getExecutionVertexIdsFromDeployOptions(
+			Collection<ExecutionVertexDeploymentOption> deploymentOptions) {
+		return deploymentOptions.stream()
+				.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
+				.collect(Collectors.toList());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
new file mode 100644
index 0000000..3a67b39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperations implements SchedulerOperations {
+
+	private final List<Collection<ExecutionVertexDeploymentOption>> scheduledVertices = new ArrayList<>();
+
+	@Override
+	public void allocateSlotsAndDeploy(Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		scheduledVertices.add(executionVertexDeploymentOptions);
+	}
+
+	public List<Collection<ExecutionVertexDeploymentOption>> getScheduledVertices() {
+		return Collections.unmodifiableList(scheduledVertices);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
new file mode 100644
index 0000000..8681fe1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A simple scheduling execution vertex for testing purposes.
+ */
+public class TestingSchedulingExecutionVertex implements SchedulingExecutionVertex {
+
+	private final ExecutionVertexID executionVertexId;
+
+	public TestingSchedulingExecutionVertex(JobVertexID jobVertexId, int subtaskIndex) {
+		this.executionVertexId = new ExecutionVertexID(jobVertexId, subtaskIndex);
+	}
+
+	@Override
+	public ExecutionVertexID getId() {
+		return executionVertexId;
+	}
+
+	@Override
+	public ExecutionState getState() {
+		return ExecutionState.CREATED;
+	}
+
+	@Override
+	public Collection<SchedulingResultPartition> getConsumedResultPartitions() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public Collection<SchedulingResultPartition> getProducedResultPartitions() {
+		return Collections.emptyList();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
new file mode 100644
index 0000000..e827f78
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+	private final Map<ExecutionVertexID, SchedulingExecutionVertex> schedulingExecutionVertices = new HashMap<>();
+
+	private final Map<IntermediateResultPartitionID, SchedulingResultPartition> schedulingResultPartitions = new HashMap<>();
+
+	@Override
+	public Iterable<SchedulingExecutionVertex> getVertices() {
+		return Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+	}
+
+	@Override
+	public Optional<SchedulingExecutionVertex> getVertex(ExecutionVertexID executionVertexId)  {
+		return Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+	}
+
+	@Override
+	public Optional<SchedulingResultPartition> getResultPartition(
+			IntermediateResultPartitionID intermediateResultPartitionId) {
+		return Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+	}
+
+	public void addSchedulingExecutionVertex(SchedulingExecutionVertex schedulingExecutionVertex) {
+		schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), schedulingExecutionVertex);
+		addSchedulingResultPartitions(schedulingExecutionVertex.getConsumedResultPartitions());
+		addSchedulingResultPartitions(schedulingExecutionVertex.getProducedResultPartitions());
+	}
+
+	private void addSchedulingResultPartitions(final Collection<SchedulingResultPartition> resultPartitions) {
+		for (SchedulingResultPartition schedulingResultPartition : resultPartitions) {
+			schedulingResultPartitions.put(schedulingResultPartition.getId(), schedulingResultPartition);
+		}
+	}
+}


[flink] 02/02: [hotfix][tests] Rename variables in EagerSchedulingStrategyTest

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8a6ea584b9841598f604761ea012136db86c97b
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed May 8 13:09:15 2019 +0200

    [hotfix][tests] Rename variables in EagerSchedulingStrategyTest
---
 .../strategy/EagerSchedulingStrategyTest.java          | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
index 881159d..1808bd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
@@ -73,10 +73,10 @@ public class EagerSchedulingStrategyTest extends TestLogger {
 		assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(1));
 
 		Collection<ExecutionVertexDeploymentOption> scheduledVertices = testingSchedulerOperations.getScheduledVertices().get(0);
-		assertThat(scheduledVertices, hasSize(5));
-		Collection<ExecutionVertexID> vertices = getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+		Collection<ExecutionVertexID> scheduledVertexIDs = getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+		assertThat(scheduledVertexIDs, hasSize(5));
 		for (SchedulingExecutionVertex schedulingExecutionVertex : testingSchedulingTopology.getVertices()) {
-			assertThat(vertices, hasItem(schedulingExecutionVertex.getId()));
+			assertThat(scheduledVertexIDs, hasItem(schedulingExecutionVertex.getId()));
 		}
 	}
 
@@ -92,24 +92,24 @@ public class EagerSchedulingStrategyTest extends TestLogger {
 		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3));
 		testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4));
 
-		Set<ExecutionVertexID> toBeRestartedVertices1 = new HashSet<>(Arrays.asList(
+		Set<ExecutionVertexID> verticesToRestart1 = new HashSet<>(Arrays.asList(
 				new ExecutionVertexID(jobVertexID, 0),
 				new ExecutionVertexID(jobVertexID, 4)));
-		schedulingStrategy.restartTasks(toBeRestartedVertices1);
+		schedulingStrategy.restartTasks(verticesToRestart1);
 
-		Set<ExecutionVertexID> toBeRestartedVertices2 = new HashSet<>(Arrays.asList(
+		Set<ExecutionVertexID> verticesToRestart2 = new HashSet<>(Arrays.asList(
 				new ExecutionVertexID(jobVertexID, 1),
 				new ExecutionVertexID(jobVertexID, 2),
 				new ExecutionVertexID(jobVertexID, 3)));
-		schedulingStrategy.restartTasks(toBeRestartedVertices2);
+		schedulingStrategy.restartTasks(verticesToRestart2);
 
 		assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(2));
 
 		Collection<ExecutionVertexDeploymentOption> scheduledVertices1 = testingSchedulerOperations.getScheduledVertices().get(0);
-		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(toBeRestartedVertices1.toArray()));
+		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(verticesToRestart1.toArray()));
 
 		Collection<ExecutionVertexDeploymentOption> scheduledVertices2 = testingSchedulerOperations.getScheduledVertices().get(1);
-		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(toBeRestartedVertices2.toArray()));
+		assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(verticesToRestart2.toArray()));
 	}
 
 	private static Collection<ExecutionVertexID> getExecutionVertexIdsFromDeployOptions(