You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/29 15:08:49 UTC
flink git commit: [FLINK-4525] [core] Drop special cases for
'StrictlyLocalAssignment' and 'PredeterminedAssignment'
Repository: flink
Updated Branches:
refs/heads/master 1d53a40a6 -> 578e80e3c
[FLINK-4525] [core] Drop special cases for 'StrictlyLocalAssignment' and 'PredeterminedAssignment'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/578e80e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/578e80e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/578e80e3
Branch: refs/heads/master
Commit: 578e80e3c161601d22760ef2ea0e52e6ae963786
Parents: 1d53a40
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Aug 27 15:23:38 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 29 17:07:55 2016 +0200
----------------------------------------------------------------------
.../api/common/io/StrictlyLocalAssignment.java | 24 -
.../executiongraph/ExecutionJobVertex.java | 104 +----
.../executiongraph/LocalInputSplitsTest.java | 436 -------------------
3 files changed, 8 insertions(+), 556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java b/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
deleted file mode 100644
index e20107b..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.io;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-@PublicEvolving
-public interface StrictlyLocalAssignment {}
http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7b28b31..6272151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
@@ -30,7 +29,6 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -81,11 +79,9 @@ public class ExecutionJobVertex {
private final CoLocationGroup coLocationGroup;
private final InputSplit[] inputSplits;
-
- private List<LocatableInputSplit>[] inputSplitsPerSubtask;
-
+
private InputSplitAssigner splitAssigner;
-
+
public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
int defaultParallelism, FiniteDuration timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
@@ -155,12 +151,7 @@ public class ExecutionJobVertex {
inputSplits = splitSource.createInputSplits(numTaskVertices);
if (inputSplits != null) {
- if (splitSource instanceof StrictlyLocalAssignment) {
- inputSplitsPerSubtask = computeLocalInputSplitsPerTask(inputSplits);
- splitAssigner = new PredeterminedInputSplitAssigner(inputSplitsPerSubtask);
- } else {
- splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
- }
+ splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
}
}
else {
@@ -278,48 +269,8 @@ public class ExecutionJobVertex {
//---------------------------------------------------------------------------------------------
public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
-
ExecutionVertex[] vertices = this.taskVertices;
-
- // check if we need to do pre-assignment of tasks
- if (inputSplitsPerSubtask != null) {
-
- final Map<String, List<Instance>> instances = scheduler.getInstancesByHost();
- final Map<String, Integer> assignments = new HashMap<String, Integer>();
-
- for (int i = 0; i < vertices.length; i++) {
- List<LocatableInputSplit> splitsForHost = inputSplitsPerSubtask[i];
- if (splitsForHost == null || splitsForHost.isEmpty()) {
- continue;
- }
-
- String[] hostNames = splitsForHost.get(0).getHostnames();
- if (hostNames == null || hostNames.length == 0 || hostNames[0] == null) {
- continue;
- }
-
- String host = hostNames[0];
- ExecutionVertex v = vertices[i];
-
- List<Instance> instancesOnHost = instances.get(host);
-
- if (instancesOnHost == null || instancesOnHost.isEmpty()) {
- throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + host
- + ". No TaskManager available on that host.");
- }
-
- Integer pos = assignments.get(host);
- if (pos == null) {
- pos = 0;
- assignments.put(host, 0);
- } else {
- assignments.put(host, (pos + 1) % instancesOnHost.size());
- }
-
- v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos)));
- }
- }
-
+
// kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(scheduler, queued);
@@ -374,17 +325,10 @@ public class ExecutionJobVertex {
// set up the input splits again
try {
if (this.inputSplits != null) {
-
- if (inputSplitsPerSubtask == null) {
- // lazy assignment
- @SuppressWarnings("unchecked")
- InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
- this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
- }
- else {
- // eager assignment
- //TODO: this.splitAssigner = new AssignBasedOnPreAssignment();
- }
+ // lazy assignment
+ @SuppressWarnings("unchecked")
+ InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+ this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
}
}
catch (Throwable t) {
@@ -426,7 +370,6 @@ public class ExecutionJobVertex {
inputSplits[i] = null;
}
}
- inputSplitsPerSubtask = null;
}
//---------------------------------------------------------------------------------------------
@@ -628,37 +571,6 @@ public class ExecutionJobVertex {
return subTaskSplitAssignment;
}
-
-
- /**
- * An InputSplitAssigner that assigns to pre-determined hosts.
- */
- public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {
-
- private List<LocatableInputSplit>[] inputSplitsPerSubtask;
-
- @SuppressWarnings("unchecked")
- public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) {
- // copy input split assignment
- this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List<?>[inputSplitsPerSubtask.length];
- for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
- List<LocatableInputSplit> next = inputSplitsPerSubtask[i];
-
- this.inputSplitsPerSubtask[i] = next == null || next.isEmpty() ?
- Collections.<LocatableInputSplit>emptyList() :
- new ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]);
- }
- }
-
- @Override
- public InputSplit getNextInputSplit(String host, int taskId) {
- if (inputSplitsPerSubtask[taskId].isEmpty()) {
- return null;
- } else {
- return inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
- }
- }
- }
public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
deleted file mode 100644
index f03370c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.StrictlyLocalAssignment;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-public class LocalInputSplitsTest {
-
- private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
-
- // --------------------------------------------------------------------------------------------
-
- @Test
- public void testNotEnoughSubtasks() {
- int numHosts = 3;
- int slotsPerHost = 1;
- int parallelism = 2;
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(1, "host1"),
- new TestLocatableInputSplit(2, "host2"),
- new TestLocatableInputSplit(3, "host3")
- };
-
- // This should fail with an exception, since the parallelism of 2 does not
- // support strictly local assignment onto 3 hosts
- try {
- runTests(numHosts, slotsPerHost, parallelism, splits);
- fail("should throw an exception");
- }
- catch (JobException e) {
- // what a great day!
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testDisallowMultipleLocations() {
- int numHosts = 2;
- int slotsPerHost = 1;
- int parallelism = 2;
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(1, new String[] { "host1", "host2" } ),
- new TestLocatableInputSplit(2, new String[] { "host1", "host2" } )
- };
-
- // This should fail with an exception, since strictly local assignment
- // currently supports only one choice of host
- try {
- runTests(numHosts, slotsPerHost, parallelism, splits);
- fail("should throw an exception");
- }
- catch (JobException e) {
- // dandy!
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testNonExistentHost() {
- int numHosts = 2;
- int slotsPerHost = 1;
- int parallelism = 2;
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(1, "host1"),
- new TestLocatableInputSplit(2, "bogus_host" )
- };
-
- // This should fail with an exception, since one of the hosts does not exist
- try {
- runTests(numHosts, slotsPerHost, parallelism, splits);
- fail("should throw an exception");
- }
- catch (JobException e) {
- // dandy!
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testEqualSplitsPerHostAndSubtask() {
- int numHosts = 5;
- int slotsPerHost = 2;
- int parallelism = 10;
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(7, "host4"),
- new TestLocatableInputSplit(8, "host4"),
- new TestLocatableInputSplit(1, "host1"),
- new TestLocatableInputSplit(2, "host1"),
- new TestLocatableInputSplit(3, "host2"),
- new TestLocatableInputSplit(4, "host2"),
- new TestLocatableInputSplit(5, "host3"),
- new TestLocatableInputSplit(6, "host3"),
- new TestLocatableInputSplit(9, "host5"),
- new TestLocatableInputSplit(10, "host5")
- };
-
- try {
- String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
-
- assertEquals("host1", hostsForTasks[0]);
- assertEquals("host1", hostsForTasks[1]);
- assertEquals("host2", hostsForTasks[2]);
- assertEquals("host2", hostsForTasks[3]);
- assertEquals("host3", hostsForTasks[4]);
- assertEquals("host3", hostsForTasks[5]);
- assertEquals("host4", hostsForTasks[6]);
- assertEquals("host4", hostsForTasks[7]);
- assertEquals("host5", hostsForTasks[8]);
- assertEquals("host5", hostsForTasks[9]);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testNonEqualSplitsPerhost() {
- int numHosts = 3;
- int slotsPerHost = 2;
- int parallelism = 5;
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(1, "host3"),
- new TestLocatableInputSplit(2, "host1"),
- new TestLocatableInputSplit(3, "host1"),
- new TestLocatableInputSplit(4, "host1"),
- new TestLocatableInputSplit(5, "host1"),
- new TestLocatableInputSplit(6, "host1"),
- new TestLocatableInputSplit(7, "host2"),
- new TestLocatableInputSplit(8, "host2")
- };
-
- try {
- String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
-
- assertEquals("host1", hostsForTasks[0]);
- assertEquals("host1", hostsForTasks[1]);
- assertEquals("host2", hostsForTasks[2]);
- assertEquals("host2", hostsForTasks[3]);
- assertEquals("host3", hostsForTasks[4]);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWithSubtasksEmpty() {
- int numHosts = 3;
- int slotsPerHost = 5;
- int parallelism = 7;
-
- // host one gets three subtasks (but two remain empty)
- // host two get two subtasks where one gets two splits, the other one split
- // host three gets two subtasks where one gets five splits, the other gets four splits
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(1, "host1"),
- new TestLocatableInputSplit(2, "host2"),
- new TestLocatableInputSplit(3, "host2"),
- new TestLocatableInputSplit(4, "host2"),
- new TestLocatableInputSplit(5, "host3"),
- new TestLocatableInputSplit(6, "host3"),
- new TestLocatableInputSplit(7, "host3"),
- new TestLocatableInputSplit(8, "host3"),
- new TestLocatableInputSplit(9, "host3"),
- new TestLocatableInputSplit(10, "host3"),
- new TestLocatableInputSplit(11, "host3"),
- new TestLocatableInputSplit(12, "host3"),
- new TestLocatableInputSplit(13, "host3")
- };
-
- try {
- String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
-
- assertEquals("host1", hostsForTasks[0]);
-
- assertEquals("host2", hostsForTasks[1]);
- assertEquals("host2", hostsForTasks[2]);
-
- assertEquals("host3", hostsForTasks[3]);
- assertEquals("host3", hostsForTasks[4]);
-
- // the current assignment leaves those with empty constraints
- assertTrue(hostsForTasks[5].equals("host1") || hostsForTasks[5].equals("host2") || hostsForTasks[5].equals("host3"));
- assertTrue(hostsForTasks[6].equals("host1") || hostsForTasks[6].equals("host2") || hostsForTasks[6].equals("host3"));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultipleInstancesPerHost() {
-
- TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
- new TestLocatableInputSplit(1, "host1"),
- new TestLocatableInputSplit(2, "host1"),
- new TestLocatableInputSplit(3, "host2"),
- new TestLocatableInputSplit(4, "host2"),
- new TestLocatableInputSplit(5, "host3"),
- new TestLocatableInputSplit(6, "host3")
- };
-
- try {
- JobVertex vertex = new JobVertex("test vertex");
- vertex.setParallelism(6);
- vertex.setInvokableClass(DummyInvokable.class);
- vertex.setInputSplitSource(new TestInputSplitSource(splits));
-
- JobGraph jobGraph = new JobGraph("test job", vertex);
-
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- jobGraph.getJobID(),
- jobGraph.getName(),
- jobGraph.getJobConfiguration(),
- new SerializedValue<>(new ExecutionConfig()),
- TIMEOUT,
- new NoRestartStrategy());
-
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
- eg.setQueuedSchedulingAllowed(false);
-
- // create a scheduler with 6 instances where always two are on the same host
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1);
- Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1);
- Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1);
- Instance i4 = getInstance(new byte[] {10,0,1,2}, 12346, "host2", 1);
- Instance i5 = getInstance(new byte[] {10,0,1,3}, 12345, "host3", 1);
- Instance i6 = getInstance(new byte[] {10,0,1,3}, 12346, "host4", 1);
- scheduler.newInstanceAvailable(i1);
- scheduler.newInstanceAvailable(i2);
- scheduler.newInstanceAvailable(i3);
- scheduler.newInstanceAvailable(i4);
- scheduler.newInstanceAvailable(i5);
- scheduler.newInstanceAvailable(i6);
-
- eg.scheduleForExecution(scheduler);
-
- ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
- assertEquals(6, tasks.length);
-
- Instance taskInstance1 = tasks[0].getCurrentAssignedResource().getInstance();
- Instance taskInstance2 = tasks[1].getCurrentAssignedResource().getInstance();
- Instance taskInstance3 = tasks[2].getCurrentAssignedResource().getInstance();
- Instance taskInstance4 = tasks[3].getCurrentAssignedResource().getInstance();
- Instance taskInstance5 = tasks[4].getCurrentAssignedResource().getInstance();
- Instance taskInstance6 = tasks[5].getCurrentAssignedResource().getInstance();
-
- assertTrue (taskInstance1 == i1 || taskInstance1 == i2);
- assertTrue (taskInstance2 == i1 || taskInstance2 == i2);
- assertTrue (taskInstance3 == i3 || taskInstance3 == i4);
- assertTrue (taskInstance4 == i3 || taskInstance4 == i4);
- assertTrue (taskInstance5 == i5 || taskInstance5 == i6);
- assertTrue (taskInstance6 == i5 || taskInstance6 == i6);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static String[] runTests(int numHosts, int slotsPerHost, int parallelism,
- TestLocatableInputSplit[] splits)
- throws Exception
- {
- JobVertex vertex = new JobVertex("test vertex");
- vertex.setParallelism(parallelism);
- vertex.setInvokableClass(DummyInvokable.class);
- vertex.setInputSplitSource(new TestInputSplitSource(splits));
-
- JobGraph jobGraph = new JobGraph("test job", vertex);
-
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- jobGraph.getJobID(),
- jobGraph.getName(),
- jobGraph.getJobConfiguration(),
- new SerializedValue<>(new ExecutionConfig()),
- TIMEOUT,
- new NoRestartStrategy());
-
- eg.setQueuedSchedulingAllowed(false);
-
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
- Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
- eg.scheduleForExecution(scheduler);
-
- ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
- assertEquals(parallelism, tasks.length);
-
- String[] hostsForTasks = new String[parallelism];
- for (int i = 0; i < parallelism; i++) {
- hostsForTasks[i] = tasks[i].getCurrentAssignedResourceLocation().getHostname();
- }
-
- return hostsForTasks;
- }
-
- private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception {
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
- for (int i = 0; i < numInstances; i++) {
- byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) };
- int dataPort = 12001 + i;
- String host = "host" + (i+1);
-
- Instance instance = getInstance(ipAddress, dataPort, host, numSlotsPerInstance);
- scheduler.newInstanceAvailable(instance);
- }
- return scheduler;
- }
-
- private static Instance getInstance(byte[] ipAddress, int dataPort, String hostname, int slots) throws Exception {
- HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-
- InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class);
- when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
- when(connection.dataPort()).thenReturn(dataPort);
- when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
- when(connection.getHostname()).thenReturn(hostname);
- when(connection.getFQDNHostname()).thenReturn(hostname);
-
- return new Instance(
- new ExecutionGraphTestUtils.SimpleActorGateway(
- TestingUtils.defaultExecutionContext()),
- connection,
- ResourceID.generate(),
- new InstanceID(),
- hardwareDescription,
- slots);
- }
-
- // --------------------------------------------------------------------------------------------
-
- // custom class to ensure behavior works for subclasses of LocatableInputSplit
- private static class TestLocatableInputSplit extends LocatableInputSplit {
-
- private static final long serialVersionUID = 1L;
-
- public TestLocatableInputSplit(int splitNumber, String hostname) {
- super(splitNumber, hostname);
- }
-
- public TestLocatableInputSplit(int splitNumber, String[] hostnames) {
- super(splitNumber, hostnames);
- }
- }
-
- private static class TestInputSplitSource implements InputSplitSource<TestLocatableInputSplit>,
- StrictlyLocalAssignment
- {
- private static final long serialVersionUID = 1L;
-
- private final TestLocatableInputSplit[] splits;
-
- public TestInputSplitSource(TestLocatableInputSplit[] splits) {
- this.splits = splits;
- }
-
- @Override
- public TestLocatableInputSplit[] createInputSplits(int minNumSplits) {
- return splits;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(TestLocatableInputSplit[] inputSplits) {
- fail("This method should not be called on StrictlyLocalAssignment splits.");
- return null; // silence the compiler
- }
- }
-}