You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/12/12 15:57:18 UTC
incubator-flink git commit: [FLINK-1287] LocalizableSplitAssigner
prefers splits with less degrees of freedom
Repository: incubator-flink
Updated Branches:
refs/heads/master 3799560e1 -> e0a4ee070
[FLINK-1287] LocalizableSplitAssigner prefers splits with less degrees of freedom
This closes #258
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e0a4ee07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e0a4ee07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e0a4ee07
Branch: refs/heads/master
Commit: e0a4ee07084bc6ab56a20fbc4a18863462da93eb
Parents: 3799560
Author: Fabian Hueske <fh...@apache.org>
Authored: Sun Dec 7 22:28:22 2014 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 12 15:53:17 2014 +0100
----------------------------------------------------------------------
.../common/io/LocatableInputSplitAssigner.java | 343 ++++++++++++++-----
.../core/io/LocatableSplitAssignerTest.java | 178 +++++++++-
2 files changed, 430 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0a4ee07/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
index 6fbde49..92fbdca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -18,12 +18,9 @@
package org.apache.flink.api.common.io;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
+import java.util.LinkedList;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,89 +33,111 @@ import org.apache.flink.util.NetUtils;
/**
* The locatable input split assigner assigns to each host splits that are local, before assigning
- * splits that are not local.
+ * splits that are not local.
*/
public final class LocatableInputSplitAssigner implements InputSplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
+ // unassigned input splits
+ private final Set<LocatableInputSplitWithCount> unassigned = new HashSet<LocatableInputSplitWithCount>();
+
+ // input splits indexed by host for local assignment
+ private final ConcurrentHashMap<String, LocatableInputSplitChooser> localPerHost = new ConcurrentHashMap<String, LocatableInputSplitChooser>();
+
+ // unassigned splits for remote assignment
+ private final LocatableInputSplitChooser remoteSplitChooser;
- private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
-
- private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
-
private int localAssignments; // lock protected by the unassigned set lock
-
+
private int remoteAssignments; // lock protected by the unassigned set lock
// --------------------------------------------------------------------------------------------
-
+
public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
- this.unassigned.addAll(splits);
+ for(LocatableInputSplit split : splits) {
+ this.unassigned.add(new LocatableInputSplitWithCount(split));
+ }
+ this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned);
}
-
+
public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
- Collections.addAll(this.unassigned, splits);
+ for(LocatableInputSplit split : splits) {
+ this.unassigned.add(new LocatableInputSplitWithCount(split));
+ }
+ this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned);
}
-
+
// --------------------------------------------------------------------------------------------
@Override
public LocatableInputSplit getNextInputSplit(String host) {
- // for a null host, we return an arbitrary split
+
+ // for a null host, we return a remote split
if (host == null) {
-
- synchronized (this.unassigned) {
- Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
- if (iter.hasNext()) {
- LocatableInputSplit next = iter.next();
- iter.remove();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Assigning split to null host (random assignment).");
- }
-
- remoteAssignments++;
- return next;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No more unassigned input splits remaining.");
+ synchronized (this.remoteSplitChooser) {
+ synchronized (this.unassigned) {
+
+ LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+ if (split != null) {
+ // got a split to assign. Double check that it hasn't been assigned before.
+ if (this.unassigned.remove(split)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Assigning split to null host (random assignment).");
+ }
+
+ remoteAssignments++;
+ return split.getSplit();
+ } else {
+ throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
+ }
+ } else {
+ // all splits consumed
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more unassigned input splits remaining.");
+ }
+ return null;
}
- return null;
}
}
}
-
+
host = host.toLowerCase(Locale.US);
-
+
// for any non-null host, we take the list of non-null splits
- List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
-
+ LocatableInputSplitChooser localSplits = this.localPerHost.get(host);
+
// if we have no list for this host yet, create one
if (localSplits == null) {
- localSplits = new ArrayList<LocatableInputSplit>(16);
-
+ localSplits = new LocatableInputSplitChooser();
+
// lock the list, to be sure that others have to wait for that host's local list
synchronized (localSplits) {
- List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
-
+ LocatableInputSplitChooser prior = this.localPerHost.putIfAbsent(host, localSplits);
+
// if someone else beat us in the case to create this list, then we do not populate this one, but
// simply work with that other list
if (prior == null) {
// we are the first, we populate
-
+
// first, copy the remaining splits to release the lock on the set early
// because that is shared among threads
- LocatableInputSplit[] remaining;
+ LocatableInputSplitWithCount[] remaining;
synchronized (this.unassigned) {
- remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
+ remaining = this.unassigned.toArray(new LocatableInputSplitWithCount[this.unassigned.size()]);
}
-
- for (LocatableInputSplit is : remaining) {
- if (isLocal(host, is.getHostnames())) {
- localSplits.add(is);
+
+ for (LocatableInputSplitWithCount isw : remaining) {
+ if (isLocal(host, isw.getSplit().getHostnames())) {
+ // Split is local on host.
+ // Increment local count
+ isw.incrementLocalCount();
+ // and add to local split list
+ localSplits.addInputSplit(isw);
}
}
+
}
else {
// someone else was faster
@@ -126,55 +145,61 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
}
}
}
-
-
+
+
// at this point, we have a list of local splits (possibly empty)
// we need to make sure no one else operates in the current list (that protects against
// list creation races) and that the unassigned set is consistent
// NOTE: we need to obtain the locks in this order, strictly!!!
synchronized (localSplits) {
- int size = localSplits.size();
- if (size > 0) {
- synchronized (this.unassigned) {
- do {
- --size;
- LocatableInputSplit split = localSplits.remove(size);
- if (this.unassigned.remove(split)) {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Assigning local split to host " + host);
- }
-
- localAssignments++;
- return split;
+ synchronized (this.unassigned) {
+
+ LocatableInputSplitWithCount split = localSplits.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+ if (split != null) {
+ // found a valid split. Double check that it hasn't been assigned before.
+ if (this.unassigned.remove(split)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Assigning local split to host " + host);
}
- } while (size > 0);
+
+ localAssignments++;
+ return split.getSplit();
+ } else {
+ throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
+ }
}
}
}
-
- // we did not find a local split, return any
- synchronized (this.unassigned) {
- Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
- if (iter.hasNext()) {
- LocatableInputSplit next = iter.next();
- iter.remove();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Assigning remote split to host " + host);
- }
-
- remoteAssignments++;
- return next;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No more input splits remaining.");
+
+ // we did not find a local split, return a remote split
+ synchronized (this.remoteSplitChooser) {
+ synchronized (this.unassigned) {
+ LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+ if (split != null) {
+ // found a valid split. Double check that it hasn't been assigned yet.
+ if (this.unassigned.remove(split)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Assigning remote split to host " + host);
+ }
+
+ remoteAssignments++;
+ return split.getSplit();
+ } else {
+ throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
+ }
+ } else {
+ // all splits consumed
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more input splits remaining.");
+ }
+ return null;
}
- return null;
}
}
}
-
+
private static final boolean isLocal(String flinkHost, String[] hosts) {
if (flinkHost == null || hosts == null) {
return false;
@@ -184,15 +209,159 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
return true;
}
}
-
+
return false;
}
-
+
public int getNumberOfLocalAssignments() {
return localAssignments;
}
-
+
public int getNumberOfRemoteAssignments() {
return remoteAssignments;
}
+
+ /**
+ * Wraps a LocatableInputSplit and adds a count for the number of observed hosts
+ * that can access the split locally.
+ */
+ private static class LocatableInputSplitWithCount {
+
+ private final LocatableInputSplit split;
+ private int localCount;
+
+ public LocatableInputSplitWithCount(LocatableInputSplit split) {
+ this.split = split;
+ this.localCount = 0;
+ }
+
+ public void incrementLocalCount() {
+ this.localCount++;
+ }
+
+ public int getLocalCount() {
+ return this.localCount;
+ }
+
+ public LocatableInputSplit getSplit() {
+ return this.split;
+ }
+
+ }
+
+ /**
+ * Holds a list of LocatableInputSplits and returns the split with the lowest local count.
+ * The rational is that splits which are local on few hosts should be preferred over others which
+ * have more degrees of freedom for local assignment.
+ *
+ * Internally, the splits are stored in a linked list. Sorting the list is not a good solution,
+ * as local counts are updated whenever a previously unseen host requests a split.
+ * Instead, we track the minimum local count and iteratively look for splits with that minimum count.
+ */
+ private static class LocatableInputSplitChooser {
+
+ // list of input splits
+ private final LinkedList<LocatableInputSplitWithCount> splits;
+
+ // the current minimum local count. We look for splits with this local count.
+ private int minLocalCount = -1;
+ // the second smallest count observed so far.
+ private int nextMinLocalCount = -1;
+ // number of elements we need to inspect for the minimum local count.
+ private int elementCycleCount = 0;
+
+ public LocatableInputSplitChooser() {
+ this.splits = new LinkedList<LocatableInputSplitWithCount>();
+ }
+
+ public LocatableInputSplitChooser(Collection<LocatableInputSplitWithCount> splits) {
+ this.splits = new LinkedList<LocatableInputSplitWithCount>();
+ for(LocatableInputSplitWithCount isw : splits) {
+ this.addInputSplit(isw);
+ }
+ }
+
+ /**
+ * Adds a single input split
+ *
+ * @param split The input split to add
+ */
+ public void addInputSplit(LocatableInputSplitWithCount split) {
+ int localCount = split.getLocalCount();
+
+ if (minLocalCount == -1) {
+ // first split to add
+ this.minLocalCount = localCount;
+ this.elementCycleCount = 1;
+ this.splits.offerFirst(split);
+ } else if (localCount < minLocalCount) {
+ // split with new min local count
+ this.nextMinLocalCount = this.minLocalCount;
+ this.minLocalCount = localCount;
+ // all other splits have more local host than this one
+ this.elementCycleCount = 1;
+ splits.offerFirst(split);
+ } else if (localCount == minLocalCount ) {
+ this.elementCycleCount++;
+ this.splits.offerFirst(split);
+ } else {
+ if (localCount < nextMinLocalCount) {
+ nextMinLocalCount = localCount;
+ }
+ splits.offerLast(split);
+ }
+ }
+
+ /**
+ * Retrieves a LocatableInputSplit with minimum local count.
+ * InputSplits which have already been assigned (i.e., which are not contained in the provided set) are filtered out.
+ * The returned input split is NOT removed from the provided set.
+ *
+ * @param unassignedSplits Set of unassigned input splits.
+ * @return An input split with minimum local count or null if all splits have been assigned.
+ */
+ public LocatableInputSplitWithCount getNextUnassignedMinLocalCountSplit(Set<LocatableInputSplitWithCount> unassignedSplits) {
+
+ if(splits.size() == 0) {
+ return null;
+ }
+
+ do {
+ elementCycleCount--;
+ // take first split of the list
+ LocatableInputSplitWithCount split = splits.pollFirst();
+ if (unassignedSplits.contains(split)) {
+ int localCount = split.getLocalCount();
+ // still unassigned, check local count
+ if (localCount > minLocalCount) {
+ // re-insert at end of the list and continue to look for split with smaller local count
+ splits.offerLast(split);
+ // check and update second smallest local count
+ if (nextMinLocalCount == -1 || split.getLocalCount() < nextMinLocalCount) {
+ nextMinLocalCount = split.getLocalCount();
+ }
+ split = null;
+ }
+ } else {
+ // split was already assigned
+ split = null;
+ }
+ if(elementCycleCount == 0) {
+ // one full cycle, but no split with min local count found
+ // update minLocalCnt and element cycle count for next pass over the splits
+ minLocalCount = nextMinLocalCount;
+ nextMinLocalCount = -1;
+ elementCycleCount = splits.size();
+ }
+ if (split != null) {
+ // found a split to assign
+ return split;
+ }
+ } while (elementCycleCount > 0);
+
+ // no split left
+ return null;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0a4ee07/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
index a962129..7edad43 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.core.io;
import static org.junit.Assert.*;
+import java.util.Arrays;
+import java.util.Calendar;
import java.util.HashSet;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -132,17 +135,140 @@ public class LocatableSplitAssignerTest {
}
@Test
+ public void testSerialSplitAssignmentSomeForRemoteHost() {
+ try {
+
+ // host1 reads all local
+ // host2 reads 10 local and 10 remote
+ // host3 reads all remote
+ final String[] hosts = { "host1", "host2", "host3" };
+ final int NUM_LOCAL_HOST1_SPLITS = 20;
+ final int NUM_LOCAL_HOST2_SPLITS = 10;
+ final int NUM_REMOTE_SPLITS = 30;
+ final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + NUM_LOCAL_HOST2_SPLITS;
+
+ // load local splits
+ int splitCnt = 0;
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ // host1 splits
+ for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, "host1"));
+ }
+ // host2 splits
+ for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, "host2"));
+ }
+ // load remote splits
+ for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
+ }
+
+ // get all available splits
+ LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+ InputSplit is = null;
+ int i = 0;
+ while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
+ assertTrue(splits.remove(is));
+ }
+
+ // check we had all
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit("anotherHost"));
+
+ assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
+ assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialSplitAssignmentMultiLocalHost() {
+ try {
+
+ final String[] localHosts = { "local1", "local2", "local3" };
+ final String[] remoteHosts = { "remote1", "remote2", "remote3" };
+ final String[] requestingHosts = { "local3", "local2", "local1", "other" };
+
+ final int NUM_THREE_LOCAL_SPLITS = 10;
+ final int NUM_TWO_LOCAL_SPLITS = 10;
+ final int NUM_ONE_LOCAL_SPLITS = 10;
+ final int NUM_LOCAL_SPLITS = 30;
+ final int NUM_REMOTE_SPLITS = 10;
+ final int NUM_SPLITS = 40;
+
+ String[] threeLocalHosts = localHosts;
+ String[] twoLocalHosts = {localHosts[0], localHosts[1], remoteHosts[0]};
+ String[] oneLocalHost = {localHosts[0], remoteHosts[0], remoteHosts[1]};
+ String[] noLocalHost = remoteHosts;
+
+ int splitCnt = 0;
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ // add splits with three local hosts
+ for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
+ }
+ // add splits with two local hosts
+ for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
+ }
+ // add splits with two local hosts
+ for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
+ }
+ // add splits with two local hosts
+ for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+ splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
+ }
+
+ // get all available splits
+ LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+ LocatableInputSplit is = null;
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ String host = requestingHosts[i % requestingHosts.length];
+ is = ia.getNextInputSplit(host);
+ // check valid split
+ assertTrue(is != null);
+ // check unassigned split
+ assertTrue(splits.remove(is));
+ // check priority of split
+ if (host.equals(localHosts[0])) {
+ assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost));
+ } else if (host.equals(localHosts[1])) {
+ assertTrue(Arrays.equals(is.getHostnames(), twoLocalHosts));
+ } else if (host.equals(localHosts[2])) {
+ assertTrue(Arrays.equals(is.getHostnames(), threeLocalHosts));
+ } else {
+ assertTrue(Arrays.equals(is.getHostnames(), noLocalHost));
+ }
+ }
+ // check we had all
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit("anotherHost"));
+
+ assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
+ assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testSerialSplitAssignmentMixedLocalHost() {
try {
final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
final int NUM_SPLITS = 10 * hosts.length;
-
+
// load some splits
Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
for (int i = 0; i < NUM_SPLITS; i++) {
splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
}
-
+
// get all available splits
LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
InputSplit is = null;
@@ -150,11 +276,11 @@ public class LocatableSplitAssignerTest {
while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
assertTrue(splits.remove(is));
}
-
+
// check we had all
assertTrue(splits.isEmpty());
assertNull(ia.getNextInputSplit("anotherHost"));
-
+
assertEquals(0, ia.getNumberOfRemoteAssignments());
assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
}
@@ -380,4 +506,48 @@ public class LocatableSplitAssignerTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testAssignmentOfManySplitsRandomly() {
+
+ long seed = Calendar.getInstance().getTimeInMillis();
+
+ final int NUM_SPLITS = 65536;
+ final String[] splitHosts = new String[256];
+ final String[] requestingHosts = new String[256];
+ final Random rand = new Random(seed);
+
+ for (int i = 0; i < splitHosts.length; i++) {
+ splitHosts[i] = "localHost" + i;
+ }
+ for (int i = 0; i < requestingHosts.length; i++) {
+ if (i % 2 == 0) {
+ requestingHosts[i] = "localHost" + i;
+ } else {
+ requestingHosts[i] = "remoteHost" + i;
+ }
+ }
+
+ String[] stringArray = {};
+ Set<String> hosts = new HashSet<String>();
+ Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ while (hosts.size() < 3) {
+ hosts.add(splitHosts[rand.nextInt(splitHosts.length)]);
+ }
+ splits.add(new LocatableInputSplit(i, hosts.toArray(stringArray)));
+ hosts.clear();
+ }
+
+ final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)]);
+ assertTrue(split != null);
+ assertTrue(splits.remove(split));
+ }
+
+ assertTrue(splits.isEmpty());
+ assertNull(ia.getNextInputSplit("testHost"));
+ }
}