You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/29 15:08:49 UTC

flink git commit: [FLINK-4525] [core] Drop special cases for 'StrictlyLocalAssignment' and 'PredeterminedAssignment'

Repository: flink
Updated Branches:
  refs/heads/master 1d53a40a6 -> 578e80e3c


[FLINK-4525] [core] Drop special cases for 'StrictlyLocalAssignment' and 'PredeterminedAssignment'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/578e80e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/578e80e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/578e80e3

Branch: refs/heads/master
Commit: 578e80e3c161601d22760ef2ea0e52e6ae963786
Parents: 1d53a40
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Aug 27 15:23:38 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 29 17:07:55 2016 +0200

----------------------------------------------------------------------
 .../api/common/io/StrictlyLocalAssignment.java  |  24 -
 .../executiongraph/ExecutionJobVertex.java      | 104 +----
 .../executiongraph/LocalInputSplitsTest.java    | 436 -------------------
 3 files changed, 8 insertions(+), 556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java b/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
deleted file mode 100644
index e20107b..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.io;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-@PublicEvolving
-public interface StrictlyLocalAssignment {}

http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7b28b31..6272151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -30,7 +29,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -81,11 +79,9 @@ public class ExecutionJobVertex {
 	private final CoLocationGroup coLocationGroup;
 	
 	private final InputSplit[] inputSplits;
-	
-	private List<LocatableInputSplit>[] inputSplitsPerSubtask;
-	
+
 	private InputSplitAssigner splitAssigner;
-	
+
 	public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
 							int defaultParallelism, FiniteDuration timeout) throws JobException {
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
@@ -155,12 +151,7 @@ public class ExecutionJobVertex {
 				inputSplits = splitSource.createInputSplits(numTaskVertices);
 				
 				if (inputSplits != null) {
-					if (splitSource instanceof StrictlyLocalAssignment) {
-						inputSplitsPerSubtask = computeLocalInputSplitsPerTask(inputSplits);
-						splitAssigner = new PredeterminedInputSplitAssigner(inputSplitsPerSubtask);
-					} else {
-						splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
-					}
+					splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
 				}
 			}
 			else {
@@ -278,48 +269,8 @@ public class ExecutionJobVertex {
 	//---------------------------------------------------------------------------------------------
 	
 	public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
-		
 		ExecutionVertex[] vertices = this.taskVertices;
-		
-		// check if we need to do pre-assignment of tasks
-		if (inputSplitsPerSubtask != null) {
-		
-			final Map<String, List<Instance>> instances = scheduler.getInstancesByHost();
-			final Map<String, Integer> assignments = new HashMap<String, Integer>();
-			
-			for (int i = 0; i < vertices.length; i++) {
-				List<LocatableInputSplit> splitsForHost = inputSplitsPerSubtask[i];
-				if (splitsForHost == null || splitsForHost.isEmpty()) {
-					continue;
-				}
-				
-				String[] hostNames = splitsForHost.get(0).getHostnames();
-				if (hostNames == null || hostNames.length == 0 || hostNames[0] == null) {
-					continue;
-				}
-				
-				String host = hostNames[0];
-				ExecutionVertex v = vertices[i];
-				
-				List<Instance> instancesOnHost = instances.get(host);
-				
-				if (instancesOnHost == null || instancesOnHost.isEmpty()) {
-					throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + host
-							+ ". No TaskManager available on that host.");
-				}
-				
-				Integer pos = assignments.get(host);
-				if (pos == null) {
-					pos = 0;
-					assignments.put(host, 0);
-				} else {
-					assignments.put(host, (pos + 1) % instancesOnHost.size());
-				}
-				
-				v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos)));
-			}
-		}
-		
+
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
 			ev.scheduleForExecution(scheduler, queued);
@@ -374,17 +325,10 @@ public class ExecutionJobVertex {
 			// set up the input splits again
 			try {
 				if (this.inputSplits != null) {
-					
-					if (inputSplitsPerSubtask == null) {
-						// lazy assignment
-						@SuppressWarnings("unchecked")
-						InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
-						this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
-					}
-					else {
-						// eager assignment
-						//TODO: this.splitAssigner = new AssignBasedOnPreAssignment();
-					}
+					// lazy assignment
+					@SuppressWarnings("unchecked")
+					InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+					this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
 				}
 			}
 			catch (Throwable t) {
@@ -426,7 +370,6 @@ public class ExecutionJobVertex {
 				inputSplits[i] = null;
 			}
 		}
-		inputSplitsPerSubtask = null;
 	}
 	
 	//---------------------------------------------------------------------------------------------
@@ -628,37 +571,6 @@ public class ExecutionJobVertex {
 		
 		return subTaskSplitAssignment;
 	}
-	
-
-	/**
-	 * An InputSplitAssigner that assigns to pre-determined hosts.
-	 */
-	public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {
-
-		private List<LocatableInputSplit>[] inputSplitsPerSubtask;
-
-		@SuppressWarnings("unchecked")
-		public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) {
-			// copy input split assignment
-			this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List<?>[inputSplitsPerSubtask.length];
-			for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
-				List<LocatableInputSplit> next = inputSplitsPerSubtask[i];
-				
-				this.inputSplitsPerSubtask[i] = next == null || next.isEmpty() ?
-						Collections.<LocatableInputSplit>emptyList() : 
-						new ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]);
-			}
-		}
-
-		@Override
-		public InputSplit getNextInputSplit(String host, int taskId) {
-			if (inputSplitsPerSubtask[taskId].isEmpty()) {
-				return null;
-			} else {
-				return inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
-			}
-		}
-	}
 
 	public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
 		if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
deleted file mode 100644
index f03370c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.StrictlyLocalAssignment;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-public class LocalInputSplitsTest {
-	
-	private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Test
-	public void testNotEnoughSubtasks() {
-		int numHosts = 3;
-		int slotsPerHost = 1;
-		int parallelism = 2;
-		
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(1, "host1"),
-				new TestLocatableInputSplit(2, "host2"),
-				new TestLocatableInputSplit(3, "host3")
-		};
-		
-		// This should fail with an exception, since the parallelism of 2 does not
-		// support strictly local assignment onto 3 hosts
-		try {
-			runTests(numHosts, slotsPerHost, parallelism, splits);
-			fail("should throw an exception");
-		}
-		catch (JobException e) {
-			// what a great day!
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testDisallowMultipleLocations() {
-		int numHosts = 2;
-		int slotsPerHost = 1;
-		int parallelism = 2;
-		
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(1, new String[] { "host1", "host2" } ),
-				new TestLocatableInputSplit(2, new String[] { "host1", "host2" } )
-		};
-		
-		// This should fail with an exception, since strictly local assignment
-		// currently supports only one choice of host
-		try {
-			runTests(numHosts, slotsPerHost, parallelism, splits);
-			fail("should throw an exception");
-		}
-		catch (JobException e) {
-			// dandy!
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testNonExistentHost() {
-		int numHosts = 2;
-		int slotsPerHost = 1;
-		int parallelism = 2;
-		
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(1, "host1"),
-				new TestLocatableInputSplit(2, "bogus_host" )
-		};
-		
-		// This should fail with an exception, since one of the hosts does not exist
-		try {
-			runTests(numHosts, slotsPerHost, parallelism, splits);
-			fail("should throw an exception");
-		}
-		catch (JobException e) {
-			// dandy!
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testEqualSplitsPerHostAndSubtask() {
-		int numHosts = 5;
-		int slotsPerHost = 2;
-		int parallelism = 10;
-		
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(7, "host4"),
-				new TestLocatableInputSplit(8, "host4"),
-				new TestLocatableInputSplit(1, "host1"),
-				new TestLocatableInputSplit(2, "host1"),
-				new TestLocatableInputSplit(3, "host2"),
-				new TestLocatableInputSplit(4, "host2"),
-				new TestLocatableInputSplit(5, "host3"),
-				new TestLocatableInputSplit(6, "host3"),
-				new TestLocatableInputSplit(9, "host5"),
-				new TestLocatableInputSplit(10, "host5")
-		};
-		
-		try {
-			String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
-			
-			assertEquals("host1", hostsForTasks[0]);
-			assertEquals("host1", hostsForTasks[1]);
-			assertEquals("host2", hostsForTasks[2]);
-			assertEquals("host2", hostsForTasks[3]);
-			assertEquals("host3", hostsForTasks[4]);
-			assertEquals("host3", hostsForTasks[5]);
-			assertEquals("host4", hostsForTasks[6]);
-			assertEquals("host4", hostsForTasks[7]);
-			assertEquals("host5", hostsForTasks[8]);
-			assertEquals("host5", hostsForTasks[9]);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testNonEqualSplitsPerhost() {
-		int numHosts = 3;
-		int slotsPerHost = 2;
-		int parallelism = 5;
-		
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(1, "host3"),
-				new TestLocatableInputSplit(2, "host1"),
-				new TestLocatableInputSplit(3, "host1"),
-				new TestLocatableInputSplit(4, "host1"),
-				new TestLocatableInputSplit(5, "host1"),
-				new TestLocatableInputSplit(6, "host1"),
-				new TestLocatableInputSplit(7, "host2"),
-				new TestLocatableInputSplit(8, "host2")
-		};
-		
-		try {
-			String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
-			
-			assertEquals("host1", hostsForTasks[0]);
-			assertEquals("host1", hostsForTasks[1]);
-			assertEquals("host2", hostsForTasks[2]);
-			assertEquals("host2", hostsForTasks[3]);
-			assertEquals("host3", hostsForTasks[4]);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWithSubtasksEmpty() {
-		int numHosts = 3;
-		int slotsPerHost = 5;
-		int parallelism = 7;
-		
-		// host one gets three subtasks (but two remain empty)
-		// host two get two subtasks where one gets two splits, the other one split
-		// host three gets two subtasks where one gets five splits, the other gets four splits
-		
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(1, "host1"),
-				new TestLocatableInputSplit(2, "host2"),
-				new TestLocatableInputSplit(3, "host2"),
-				new TestLocatableInputSplit(4, "host2"),
-				new TestLocatableInputSplit(5, "host3"),
-				new TestLocatableInputSplit(6, "host3"),
-				new TestLocatableInputSplit(7, "host3"),
-				new TestLocatableInputSplit(8, "host3"),
-				new TestLocatableInputSplit(9, "host3"),
-				new TestLocatableInputSplit(10, "host3"),
-				new TestLocatableInputSplit(11, "host3"),
-				new TestLocatableInputSplit(12, "host3"),
-				new TestLocatableInputSplit(13, "host3")
-		};
-		
-		try {
-			String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
-			
-			assertEquals("host1", hostsForTasks[0]);
-			
-			assertEquals("host2", hostsForTasks[1]);
-			assertEquals("host2", hostsForTasks[2]);
-
-			assertEquals("host3", hostsForTasks[3]);
-			assertEquals("host3", hostsForTasks[4]);
-			
-			// the current assignment leaves those with empty constraints
-			assertTrue(hostsForTasks[5].equals("host1") || hostsForTasks[5].equals("host2") || hostsForTasks[5].equals("host3"));
-			assertTrue(hostsForTasks[6].equals("host1") || hostsForTasks[6].equals("host2") || hostsForTasks[6].equals("host3"));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultipleInstancesPerHost() {
-
-		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
-				new TestLocatableInputSplit(1, "host1"),
-				new TestLocatableInputSplit(2, "host1"),
-				new TestLocatableInputSplit(3, "host2"),
-				new TestLocatableInputSplit(4, "host2"),
-				new TestLocatableInputSplit(5, "host3"),
-				new TestLocatableInputSplit(6, "host3")
-		};
-		
-		try {
-			JobVertex vertex = new JobVertex("test vertex");
-			vertex.setParallelism(6);
-			vertex.setInvokableClass(DummyInvokable.class);
-			vertex.setInputSplitSource(new TestInputSplitSource(splits));
-			
-			JobGraph jobGraph = new JobGraph("test job", vertex);
-			
-			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
-				jobGraph.getJobID(),
-				jobGraph.getName(),  
-				jobGraph.getJobConfiguration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				TIMEOUT,
-				new NoRestartStrategy());
-			
-			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-			eg.setQueuedSchedulingAllowed(false);
-			
-			// create a scheduler with 6 instances where always two are on the same host
-			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-			Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1);
-			Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1);
-			Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1);
-			Instance i4 = getInstance(new byte[] {10,0,1,2}, 12346, "host2", 1);
-			Instance i5 = getInstance(new byte[] {10,0,1,3}, 12345, "host3", 1);
-			Instance i6 = getInstance(new byte[] {10,0,1,3}, 12346, "host4", 1);
-			scheduler.newInstanceAvailable(i1);
-			scheduler.newInstanceAvailable(i2);
-			scheduler.newInstanceAvailable(i3);
-			scheduler.newInstanceAvailable(i4);
-			scheduler.newInstanceAvailable(i5);
-			scheduler.newInstanceAvailable(i6);
-			
-			eg.scheduleForExecution(scheduler);
-			
-			ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
-			assertEquals(6, tasks.length);
-			
-			Instance taskInstance1 = tasks[0].getCurrentAssignedResource().getInstance();
-			Instance taskInstance2 = tasks[1].getCurrentAssignedResource().getInstance();
-			Instance taskInstance3 = tasks[2].getCurrentAssignedResource().getInstance();
-			Instance taskInstance4 = tasks[3].getCurrentAssignedResource().getInstance();
-			Instance taskInstance5 = tasks[4].getCurrentAssignedResource().getInstance();
-			Instance taskInstance6 = tasks[5].getCurrentAssignedResource().getInstance();
-			
-			assertTrue (taskInstance1 == i1 || taskInstance1 == i2);
-			assertTrue (taskInstance2 == i1 || taskInstance2 == i2);
-			assertTrue (taskInstance3 == i3 || taskInstance3 == i4);
-			assertTrue (taskInstance4 == i3 || taskInstance4 == i4);
-			assertTrue (taskInstance5 == i5 || taskInstance5 == i6);
-			assertTrue (taskInstance6 == i5 || taskInstance6 == i6);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static String[] runTests(int numHosts, int slotsPerHost, int parallelism, 
-			TestLocatableInputSplit[] splits)
-		throws Exception
-	{
-		JobVertex vertex = new JobVertex("test vertex");
-		vertex.setParallelism(parallelism);
-		vertex.setInvokableClass(DummyInvokable.class);
-		vertex.setInputSplitSource(new TestInputSplitSource(splits));
-		
-		JobGraph jobGraph = new JobGraph("test job", vertex);
-		
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			jobGraph.getJobID(),
-			jobGraph.getName(),  
-			jobGraph.getJobConfiguration(),
-				new SerializedValue<>(new ExecutionConfig()),
-			TIMEOUT,
-			new NoRestartStrategy());
-		
-		eg.setQueuedSchedulingAllowed(false);
-		
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-		
-		Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
-		eg.scheduleForExecution(scheduler);
-		
-		ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
-		assertEquals(parallelism, tasks.length);
-		
-		String[] hostsForTasks = new String[parallelism];
-		for (int i = 0; i < parallelism; i++) {
-			hostsForTasks[i] = tasks[i].getCurrentAssignedResourceLocation().getHostname();
-		}
-		
-		return hostsForTasks;
-	}
-	
-	private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception {
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		
-		for (int i = 0; i < numInstances; i++) {
-			byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) };
-			int dataPort = 12001 + i;
-			String host = "host" + (i+1);
-			
-			Instance instance = getInstance(ipAddress, dataPort, host, numSlotsPerInstance);
-			scheduler.newInstanceAvailable(instance);
-		}
-		return scheduler;
-	}
-	
-	private static Instance getInstance(byte[] ipAddress, int dataPort, String hostname, int slots) throws Exception {
-		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-		
-		InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class);
-		when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
-		when(connection.dataPort()).thenReturn(dataPort);
-		when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
-		when(connection.getHostname()).thenReturn(hostname);
-		when(connection.getFQDNHostname()).thenReturn(hostname);
-		
-		return new Instance(
-				new ExecutionGraphTestUtils.SimpleActorGateway(
-						TestingUtils.defaultExecutionContext()),
-				connection,
-				ResourceID.generate(),
-				new InstanceID(),
-				hardwareDescription,
-				slots);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	// custom class to ensure behavior works for subclasses of LocatableInputSplit
-	private static class TestLocatableInputSplit extends LocatableInputSplit {
-		
-		private static final long serialVersionUID = 1L;
-
-		public TestLocatableInputSplit(int splitNumber, String hostname) {
-			super(splitNumber, hostname);
-		}
-		
-		public TestLocatableInputSplit(int splitNumber, String[] hostnames) {
-			super(splitNumber, hostnames);
-		}
-	}
-	
-	private static class TestInputSplitSource implements InputSplitSource<TestLocatableInputSplit>,
-		StrictlyLocalAssignment
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final TestLocatableInputSplit[] splits;
-		
-		public TestInputSplitSource(TestLocatableInputSplit[] splits) {
-			this.splits = splits;
-		}
-
-		@Override
-		public TestLocatableInputSplit[] createInputSplits(int minNumSplits) {
-			return splits;
-		}
-		
-		@Override
-		public InputSplitAssigner getInputSplitAssigner(TestLocatableInputSplit[] inputSplits) {
-			fail("This method should not be called on StrictlyLocalAssignment splits.");
-			return null; // silence the compiler
-		}
-	}
-}