You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/12/12 15:57:18 UTC

incubator-flink git commit: [FLINK-1287] LocalizableSplitAssigner prefers splits with less degrees of freedom

Repository: incubator-flink
Updated Branches:
  refs/heads/master 3799560e1 -> e0a4ee070


[FLINK-1287] LocalizableSplitAssigner prefers splits with less degrees of freedom

This closes #258


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e0a4ee07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e0a4ee07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e0a4ee07

Branch: refs/heads/master
Commit: e0a4ee07084bc6ab56a20fbc4a18863462da93eb
Parents: 3799560
Author: Fabian Hueske <fh...@apache.org>
Authored: Sun Dec 7 22:28:22 2014 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 12 15:53:17 2014 +0100

----------------------------------------------------------------------
 .../common/io/LocatableInputSplitAssigner.java  | 343 ++++++++++++++-----
 .../core/io/LocatableSplitAssignerTest.java     | 178 +++++++++-
 2 files changed, 430 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0a4ee07/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
index 6fbde49..92fbdca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.api.common.io;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
+import java.util.LinkedList;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,89 +33,111 @@ import org.apache.flink.util.NetUtils;
 
 /**
  * The locatable input split assigner assigns to each host splits that are local, before assigning
- * splits that are not local. 
+ * splits that are not local.
  */
 public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
 
+	// unassigned input splits
+	private final Set<LocatableInputSplitWithCount> unassigned = new HashSet<LocatableInputSplitWithCount>();
+
+	// input splits indexed by host for local assignment
+	private final ConcurrentHashMap<String, LocatableInputSplitChooser> localPerHost = new ConcurrentHashMap<String, LocatableInputSplitChooser>();
+
+	// unassigned splits for remote assignment
+	private final LocatableInputSplitChooser remoteSplitChooser;
 
-	private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
-	
-	private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
-	
 	private int localAssignments;		// lock protected by the unassigned set lock
-	
+
 	private int remoteAssignments;		// lock protected by the unassigned set lock
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
-		this.unassigned.addAll(splits);
+		for(LocatableInputSplit split : splits) {
+			this.unassigned.add(new LocatableInputSplitWithCount(split));
+		}
+		this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned);
 	}
-	
+
 	public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
-		Collections.addAll(this.unassigned, splits);
+		for(LocatableInputSplit split : splits) {
+			this.unassigned.add(new LocatableInputSplitWithCount(split));
+		}
+		this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public LocatableInputSplit getNextInputSplit(String host) {
-		// for a null host, we return an arbitrary split
+
+		// for a null host, we return a remote split
 		if (host == null) {
-			
-			synchronized (this.unassigned) {
-				Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
-				if (iter.hasNext()) {
-					LocatableInputSplit next = iter.next();
-					iter.remove();
-					
-					if (LOG.isInfoEnabled()) {
-						LOG.info("Assigning split to null host (random assignment).");
-					}
-					
-					remoteAssignments++;
-					return next;
-				} else {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("No more unassigned input splits remaining.");
+			synchronized (this.remoteSplitChooser) {
+				synchronized (this.unassigned) {
+
+					LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+					if (split != null) {
+						// got a split to assign. Double check that it hasn't been assigned before.
+						if (this.unassigned.remove(split)) {
+							if (LOG.isInfoEnabled()) {
+								LOG.info("Assigning split to null host (random assignment).");
+							}
+
+							remoteAssignments++;
+							return split.getSplit();
+						} else {
+							throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
+						}
+					} else {
+						// all splits consumed
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("No more unassigned input splits remaining.");
+						}
+						return null;
 					}
-					return null;
 				}
 			}
 		}
-		
+
 		host = host.toLowerCase(Locale.US);
-		
+
 		// for any non-null host, we take the list of non-null splits
-		List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
-		
+		LocatableInputSplitChooser localSplits = this.localPerHost.get(host);
+
 		// if we have no list for this host yet, create one
 		if (localSplits == null) {
-			localSplits = new ArrayList<LocatableInputSplit>(16);
-			
+			localSplits = new LocatableInputSplitChooser();
+
 			// lock the list, to be sure that others have to wait for that host's local list
 			synchronized (localSplits) {
-				List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
-				
+				LocatableInputSplitChooser prior = this.localPerHost.putIfAbsent(host, localSplits);
+
 				// if someone else beat us in the case to create this list, then we do not populate this one, but
 				// simply work with that other list
 				if (prior == null) {
 					// we are the first, we populate
-					
+
 					// first, copy the remaining splits to release the lock on the set early
 					// because that is shared among threads
-					LocatableInputSplit[] remaining;
+					LocatableInputSplitWithCount[] remaining;
 					synchronized (this.unassigned) {
-						remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
+						remaining = this.unassigned.toArray(new LocatableInputSplitWithCount[this.unassigned.size()]);
 					}
-					
-					for (LocatableInputSplit is : remaining) {
-						if (isLocal(host, is.getHostnames())) {
-							localSplits.add(is);
+
+					for (LocatableInputSplitWithCount isw : remaining) {
+						if (isLocal(host, isw.getSplit().getHostnames())) {
+							// Split is local on host.
+							// Increment local count
+							isw.incrementLocalCount();
+                            // and add to local split list
+							localSplits.addInputSplit(isw);
 						}
 					}
+
 				}
 				else {
 					// someone else was faster
@@ -126,55 +145,61 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 				}
 			}
 		}
-		
-		
+
+
 		// at this point, we have a list of local splits (possibly empty)
 		// we need to make sure no one else operates in the current list (that protects against
 		// list creation races) and that the unassigned set is consistent
 		// NOTE: we need to obtain the locks in this order, strictly!!!
 		synchronized (localSplits) {
-			int size = localSplits.size();
-			if (size > 0) {
-				synchronized (this.unassigned) {
-					do {
-						--size;
-						LocatableInputSplit split = localSplits.remove(size);
-						if (this.unassigned.remove(split)) {
-							
-							if (LOG.isInfoEnabled()) {
-								LOG.info("Assigning local split to host " + host);
-							}
-							
-							localAssignments++;
-							return split;
+			synchronized (this.unassigned) {
+
+				LocatableInputSplitWithCount split = localSplits.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+				if (split != null) {
+					// found a valid split. Double check that it hasn't been assigned before.
+					if (this.unassigned.remove(split)) {
+						if (LOG.isInfoEnabled()) {
+							LOG.info("Assigning local split to host " + host);
 						}
-					} while (size > 0);
+
+						localAssignments++;
+						return split.getSplit();
+					} else {
+						throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
+					}
 				}
 			}
 		}
-		
-		// we did not find a local split, return any
-		synchronized (this.unassigned) {
-			Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
-			if (iter.hasNext()) {
-				LocatableInputSplit next = iter.next();
-				iter.remove();
-				
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Assigning remote split to host " + host);
-				}
-				
-				remoteAssignments++;
-				return next;
-			} else {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("No more input splits remaining.");
+
+		// we did not find a local split, return a remote split
+		synchronized (this.remoteSplitChooser) {
+			synchronized (this.unassigned) {
+				LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+				if (split != null) {
+					// found a valid split. Double check that it hasn't been assigned yet.
+					if (this.unassigned.remove(split)) {
+						if (LOG.isInfoEnabled()) {
+							LOG.info("Assigning remote split to host " + host);
+						}
+
+						remoteAssignments++;
+						return split.getSplit();
+					} else {
+						throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!");
+					}
+				} else {
+					// all splits consumed
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("No more input splits remaining.");
+					}
+					return null;
 				}
-				return null;
 			}
 		}
 	}
-	
+
 	private static final boolean isLocal(String flinkHost, String[] hosts) {
 		if (flinkHost == null || hosts == null) {
 			return false;
@@ -184,15 +209,159 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 				return true;
 			}
 		}
-		
+
 		return false;
 	}
-	
+
 	public int getNumberOfLocalAssignments() {
 		return localAssignments;
 	}
-	
+
 	public int getNumberOfRemoteAssignments() {
 		return remoteAssignments;
 	}
+
+	/**
+	 * Wraps a LocatableInputSplit and adds a count for the number of observed hosts
+	 * that can access the split locally.
+	 */
+	private static class LocatableInputSplitWithCount {
+
+		private final LocatableInputSplit split;
+		private int localCount;
+
+		public LocatableInputSplitWithCount(LocatableInputSplit split) {
+			this.split = split;
+			this.localCount = 0;
+		}
+
+		public void incrementLocalCount() {
+			this.localCount++;
+		}
+
+		public int getLocalCount() {
+			return this.localCount;
+		}
+
+		public LocatableInputSplit getSplit() {
+			return this.split;
+		}
+
+	}
+
+	/**
+	 * Holds a list of LocatableInputSplits and returns the split with the lowest local count.
+	 * The rational is that splits which are local on few hosts should be preferred over others which
+	 * have more degrees of freedom for local assignment.
+	 *
+	 * Internally, the splits are stored in a linked list. Sorting the list is not a good solution,
+	 * as local counts are updated whenever a previously unseen host requests a split.
+	 * Instead, we track the minimum local count and iteratively look for splits with that minimum count.
+	 */
+	private static class LocatableInputSplitChooser {
+
+		// list of input splits
+		private final LinkedList<LocatableInputSplitWithCount> splits;
+
+		// the current minimum local count. We look for splits with this local count.
+		private int minLocalCount = -1;
+		// the second smallest count observed so far.
+		private int nextMinLocalCount = -1;
+		// number of elements we need to inspect for the minimum local count.
+		private int elementCycleCount = 0;
+
+		public LocatableInputSplitChooser() {
+			this.splits = new LinkedList<LocatableInputSplitWithCount>();
+		}
+
+		public LocatableInputSplitChooser(Collection<LocatableInputSplitWithCount> splits) {
+			this.splits = new LinkedList<LocatableInputSplitWithCount>();
+			for(LocatableInputSplitWithCount isw : splits) {
+				this.addInputSplit(isw);
+			}
+		}
+
+		/**
+		 * Adds a single input split
+		 *
+		 * @param split The input split to add
+		 */
+		public void addInputSplit(LocatableInputSplitWithCount split) {
+			int localCount = split.getLocalCount();
+
+			if (minLocalCount == -1) {
+				// first split to add
+				this.minLocalCount = localCount;
+				this.elementCycleCount = 1;
+				this.splits.offerFirst(split);
+			} else if (localCount < minLocalCount) {
+				// split with new min local count
+				this.nextMinLocalCount = this.minLocalCount;
+				this.minLocalCount = localCount;
+				// all other splits have more local host than this one
+				this.elementCycleCount = 1;
+				splits.offerFirst(split);
+			} else if (localCount == minLocalCount ) {
+				this.elementCycleCount++;
+				this.splits.offerFirst(split);
+			} else {
+				if (localCount < nextMinLocalCount) {
+					nextMinLocalCount = localCount;
+				}
+				splits.offerLast(split);
+			}
+		}
+
+		/**
+		 * Retrieves a LocatableInputSplit with minimum local count.
+		 * InputSplits which have already been assigned (i.e., which are not contained in the provided set) are filtered out.
+		 * The returned input split is NOT removed from the provided set.
+		 *
+		 * @param unassignedSplits Set of unassigned input splits.
+		 * @return An input split with minimum local count or null if all splits have been assigned.
+		 */
+		public LocatableInputSplitWithCount getNextUnassignedMinLocalCountSplit(Set<LocatableInputSplitWithCount> unassignedSplits) {
+
+			if(splits.size() == 0) {
+				return null;
+			}
+
+			do {
+				elementCycleCount--;
+				// take first split of the list
+				LocatableInputSplitWithCount split = splits.pollFirst();
+				if (unassignedSplits.contains(split)) {
+					int localCount = split.getLocalCount();
+					// still unassigned, check local count
+					if (localCount > minLocalCount) {
+						// re-insert at end of the list and continue to look for split with smaller local count
+						splits.offerLast(split);
+						// check and update second smallest local count
+						if (nextMinLocalCount == -1 || split.getLocalCount() < nextMinLocalCount) {
+							nextMinLocalCount = split.getLocalCount();
+						}
+						split = null;
+					}
+				} else {
+					// split was already assigned
+					split = null;
+				}
+				if(elementCycleCount == 0) {
+					// one full cycle, but no split with min local count found
+					// update minLocalCnt and element cycle count for next pass over the splits
+					minLocalCount = nextMinLocalCount;
+					nextMinLocalCount = -1;
+					elementCycleCount = splits.size();
+				}
+				if (split != null) {
+					// found a split to assign
+					return split;
+				}
+			} while (elementCycleCount > 0);
+
+			// no split left
+			return null;
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e0a4ee07/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
index a962129..7edad43 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.core.io;
 
 import static org.junit.Assert.*;
 
+import java.util.Arrays;
+import java.util.Calendar;
 import java.util.HashSet;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -132,17 +135,140 @@ public class LocatableSplitAssignerTest {
 	}
 	
 	@Test
+	public void testSerialSplitAssignmentSomeForRemoteHost() {
+		try {
+
+			// host1 reads all local
+			// host2 reads 10 local and 10 remote
+			// host3 reads all remote
+			final String[] hosts = { "host1", "host2", "host3" };
+			final int NUM_LOCAL_HOST1_SPLITS = 20;
+			final int NUM_LOCAL_HOST2_SPLITS = 10;
+			final int NUM_REMOTE_SPLITS = 30;
+			final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + NUM_LOCAL_HOST2_SPLITS;
+
+			// load local splits
+			int splitCnt = 0;
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			// host1 splits
+			for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, "host1"));
+			}
+			// host2 splits
+			for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, "host2"));
+			}
+			// load remote splits
+			for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
+			}
+
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			int i = 0;
+			while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
+				assertTrue(splits.remove(is));
+			}
+
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit("anotherHost"));
+
+			assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSerialSplitAssignmentMultiLocalHost() {
+		try {
+
+			final String[] localHosts = { "local1", "local2", "local3" };
+			final String[] remoteHosts = { "remote1", "remote2", "remote3" };
+			final String[] requestingHosts = { "local3", "local2", "local1", "other" };
+
+			final int NUM_THREE_LOCAL_SPLITS = 10;
+			final int NUM_TWO_LOCAL_SPLITS = 10;
+			final int NUM_ONE_LOCAL_SPLITS = 10;
+			final int NUM_LOCAL_SPLITS = 30;
+			final int NUM_REMOTE_SPLITS = 10;
+			final int NUM_SPLITS = 40;
+
+			String[] threeLocalHosts = localHosts;
+			String[] twoLocalHosts = {localHosts[0], localHosts[1], remoteHosts[0]};
+			String[] oneLocalHost = {localHosts[0], remoteHosts[0], remoteHosts[1]};
+			String[] noLocalHost = remoteHosts;
+
+			int splitCnt = 0;
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			// add splits with three local hosts
+			for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
+			}
+			// add splits with two local hosts
+						for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
+			}
+			// add splits with two local hosts
+			for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
+			}
+			// add splits with two local hosts
+			for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
+			}
+
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			LocatableInputSplit is = null;
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				String host = requestingHosts[i % requestingHosts.length];
+				is = ia.getNextInputSplit(host);
+				// check valid split
+				assertTrue(is != null);
+				// check unassigned split
+				assertTrue(splits.remove(is));
+				// check priority of split
+				if (host.equals(localHosts[0])) {
+					assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost));
+				} else if (host.equals(localHosts[1])) {
+					assertTrue(Arrays.equals(is.getHostnames(), twoLocalHosts));
+				} else if (host.equals(localHosts[2])) {
+					assertTrue(Arrays.equals(is.getHostnames(), threeLocalHosts));
+				} else {
+					assertTrue(Arrays.equals(is.getHostnames(), noLocalHost));
+				}
+			}
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit("anotherHost"));
+
+			assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testSerialSplitAssignmentMixedLocalHost() {
 		try {
 			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
 			final int NUM_SPLITS = 10 * hosts.length;
-			
+
 			// load some splits
 			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
 			for (int i = 0; i < NUM_SPLITS; i++) {
 				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
 			}
-			
+
 			// get all available splits
 			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
 			InputSplit is = null;
@@ -150,11 +276,11 @@ public class LocatableSplitAssignerTest {
 			while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
 				assertTrue(splits.remove(is));
 			}
-			
+
 			// check we had all
 			assertTrue(splits.isEmpty());
 			assertNull(ia.getNextInputSplit("anotherHost"));
-			
+
 			assertEquals(0, ia.getNumberOfRemoteAssignments());
 			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
 		}
@@ -380,4 +506,48 @@ public class LocatableSplitAssignerTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testAssignmentOfManySplitsRandomly() {
+
+		long seed = Calendar.getInstance().getTimeInMillis();
+
+		final int NUM_SPLITS = 65536;
+		final String[] splitHosts = new String[256];
+		final String[] requestingHosts = new String[256];
+		final Random rand = new Random(seed);
+
+		for (int i = 0; i < splitHosts.length; i++) {
+			splitHosts[i] = "localHost" + i;
+		}
+		for (int i = 0; i < requestingHosts.length; i++) {
+			if (i % 2 == 0) {
+				requestingHosts[i] = "localHost" + i;
+			} else {
+				requestingHosts[i] = "remoteHost" + i;
+			}
+		}
+
+		String[] stringArray = {};
+		Set<String> hosts = new HashSet<String>();
+		Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+		for (int i = 0; i < NUM_SPLITS; i++) {
+			while (hosts.size() < 3) {
+				hosts.add(splitHosts[rand.nextInt(splitHosts.length)]);
+			}
+			splits.add(new LocatableInputSplit(i, hosts.toArray(stringArray)));
+			hosts.clear();
+		}
+
+		final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+
+		for (int i = 0; i < NUM_SPLITS; i++) {
+			LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)]);
+			assertTrue(split != null);
+			assertTrue(splits.remove(split));
+		}
+
+		assertTrue(splits.isEmpty());
+		assertNull(ia.getNextInputSplit("testHost"));
+	}
 }