You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/29 20:49:30 UTC
svn commit: r1378681 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/graph/BspServiceWorker.java
src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
src/test/java/org/apache/giraph/TestBspBasic.java
Author: apresta
Date: Wed Aug 29 18:49:29 2012
New Revision: 1378681
URL: http://svn.apache.org/viewvc?rev=1378681&view=rev
Log:
GIRAPH-301: InputSplit Reservations are clumping, leaving many workers asleep while other process too many splits and get overloaded. (Eli Reisman via apresta)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 29 18:49:29 2012
@@ -2,6 +2,10 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-301: InputSplit Reservations are clumping, leaving many workers
+ asleep while other process too many splits and get overloaded.
+ (Eli Reisman via apresta)
+
GIRAPH-313: Open Netty client and server on master. (majakabiljo via
aching)
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Aug 29 18:49:29 2012
@@ -204,6 +204,15 @@ public class BspServiceWorker<I extends
* Try to reserve an InputSplit for loading. While InputSplits exists that
* are not finished, wait until they are.
*
+ * NOTE: iterations on the InputSplit list only halt for each worker when it
+ * has scanned the entire list once and found every split marked RESERVED.
+ * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+ * allowing other iterating workers to claim it's previously read splits.
+ * Only when the last worker left iterating on the list fails can a danger
+ * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+ * causes job failure, this is OK. As the failure model evolves, this
+ * behavior might need to change.
+ *
* @return reserved InputSplit or null if no unfinished InputSplits exist
* @throws KeeperException
* @throws InterruptedException
@@ -217,27 +226,16 @@ public class BspServiceWorker<I extends
inputSplitCount = inputSplitPathList.size();
}
LocalityInfoSorter localitySorter = new LocalityInfoSorter(
- getZkExt(), inputSplitPathList, getHostname());
- inputSplitPathList =
- localitySorter.getPrioritizedLocalInputSplits();
+ getZkExt(), inputSplitPathList, getHostname(), getWorkerInfo().getPort());
String reservedInputSplitPath = null;
Stat reservedStat = null;
final Mapper<?, ?, ?, ?>.Context context = getContext();
while (true) {
- int finishedInputSplits = 0;
- for (int i = 0; i < inputSplitPathList.size(); ++i) {
+ int reservedInputSplits = 0;
+ for (String nextSplitToClaim : localitySorter) {
context.progress();
- String tmpInputSplitFinishedPath =
- inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
- reservedStat =
- getZkExt().exists(tmpInputSplitFinishedPath, true);
- if (reservedStat != null) {
- ++finishedInputSplits;
- continue;
- }
-
String tmpInputSplitReservedPath =
- inputSplitPathList.get(i) + INPUT_SPLIT_RESERVED_NODE;
+ nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
reservedStat =
getZkExt().exists(tmpInputSplitReservedPath, true);
if (reservedStat == null) {
@@ -248,16 +246,16 @@ public class BspServiceWorker<I extends
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
false);
- reservedInputSplitPath = inputSplitPathList.get(i);
+ reservedInputSplitPath = nextSplitToClaim;
if (LOG.isInfoEnabled()) {
float percentFinished =
- finishedInputSplits * 100.0f /
+ reservedInputSplits * 100.0f /
inputSplitPathList.size();
LOG.info("reserveInputSplit: Reserved input " +
"split path " + reservedInputSplitPath +
", overall roughly " +
+ percentFinished +
- "% input splits finished");
+ "% input splits reserved");
}
return reservedInputSplitPath;
} catch (KeeperException.NodeExistsException e) {
@@ -272,15 +270,17 @@ public class BspServiceWorker<I extends
"reserveInputSplit: InterruptedException " +
"on reserve", e);
}
+ } else {
+ ++reservedInputSplits;
}
}
if (LOG.isInfoEnabled()) {
LOG.info("reserveInputSplit: reservedPath = " +
- reservedInputSplitPath + ", " + finishedInputSplits +
+ reservedInputSplitPath + ", " + reservedInputSplits +
" of " + inputSplitPathList.size() +
" InputSplits are finished.");
}
- if (finishedInputSplits == inputSplitPathList.size()) {
+ if (reservedInputSplits == inputSplitPathList.size()) {
transferRegulator = null; // don't need this anymore
return null;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java Wed Aug 29 18:49:29 2012
@@ -21,7 +21,9 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Iterator;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
@@ -29,45 +31,62 @@ import org.apache.zookeeper.KeeperExcept
/**
* Utility class to extract InputSplit locality information
* from znodes and to sort the InputSplit list for the worker
- * owning this object to select splits from.
+ * owning this object to favor local data block selection.
+ *
+ * This class also provides a hash-rotated index by which workers
+ * must index into their list of InputSplits, This will be especially
+ * helpful to those who do not find local blocks to read, and must
+ * contend with other workers for non-local splits in the list.
+ *
+ * Searching for splits using ZK reads is slowed by the fact that
+ * after each ZK write (to mark a split reserved or finished) the
+ * ZK quorum must be sync'd before pending read requests can be
+ * fulfilled. During InputSplit claiming, the writes are frequent on
+ * both reserved and finished node trees; the aim is to cut down on
+ * the number of ZK reads workers perform to locate an unclaimed node.
*/
-public class LocalityInfoSorter {
+public class LocalityInfoSorter implements Iterable<String> {
/** The worker's local ZooKeeperExt ref */
private final ZooKeeperExt zooKeeper;
/** The List of InputSplit znode paths */
private final List<String> pathList;
/** The worker's hostname */
private final String hostName;
+ /** The adjusted base offset by which to iterate on the path list */
+ private final int baseOffset;
/**
* Constructor
* @param zooKeeper the worker's ZkExt
* @param pathList the path to read from
* @param hostName the worker's host name (for matching)
+ * @param port the port number for this worker
*/
- public LocalityInfoSorter(ZooKeeperExt zooKeeper, List<String> pathList,
- String hostName) {
+ public LocalityInfoSorter(final ZooKeeperExt zooKeeper,
+ List<String> pathList, final String hostName, final int port) {
this.zooKeeper = zooKeeper;
this.pathList = pathList;
this.hostName = hostName;
+ // determine the hash-based offset for this worker to iterate from
+ // and place the local blocks into the list at that index, if any
+ final int temp = hostName.hashCode() + (19 * port);
+ baseOffset =
+ Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
+ prioritizeLocalInputSplits();
}
- /**
- * Re-order list of InputSplits so files local to this worker node's
- * disk are the first it will iterate over when attempting to claim
- * a split to read. This will increase locality of data reads with greater
- * probability as the % of total nodes in the cluster hosting data and workers
- * BOTH increase towards 100%. Replication increases our chances of a "hit."
- *
- * @return the pathList, with host-local splits sorted to the front.
- */
- public List<String> getPrioritizedLocalInputSplits() {
+ /**
+ * Re-order list of InputSplits so files local to this worker node's
+ * disk are the first it will iterate over when attempting to claim
+ * a split to read. This will increase locality of data reads with greater
+ * probability as the % of total nodes in the cluster hosting data and workers
+ * BOTH increase towards 100%. Replication increases our chances of a "hit."
+ */
+ private void prioritizeLocalInputSplits() {
List<String> sortedList = new ArrayList<String>();
- boolean prioritize;
String hosts = null;
for (int index = 0; index < pathList.size(); ++index) {
final String path = pathList.get(index);
- prioritize = false;
try {
hosts = getLocationsFromZkInputSplitData(path);
} catch (IOException ioe) {
@@ -77,10 +96,14 @@ public class LocalityInfoSorter {
} catch (InterruptedException ie) {
hosts = null;
}
- prioritize = hosts == null ? false : hosts.contains(hostName);
- sortedList.add(prioritize ? 0 : index, path);
+ if (hosts != null && hosts.contains(hostName)) {
+ sortedList.add(path); // collect the local block
+ pathList.remove(index); // remove local block from list
+ }
}
- return sortedList;
+ // shuffle the local blocks in case several workers exist on this host
+ Collections.shuffle(sortedList);
+ pathList.addAll(baseOffset, sortedList); // re-insert local blocks
}
/**
@@ -88,7 +111,7 @@ public class LocalityInfoSorter {
*
* @param zkSplitPath the input split path to attempt to read
* ZNode locality data from for this InputSplit.
- * @return an array of String hostnames from ZNode data, or throws
+ * @return a String of hostnames from ZNode data, or throws
*/
private String getLocationsFromZkInputSplitData(String zkSplitPath)
throws IOException, KeeperException, InterruptedException {
@@ -98,4 +121,43 @@ public class LocalityInfoSorter {
// only read the "first" entry in the znode data, the locations
return Text.readString(inputStream);
}
+
+ /**
+ * Iterator for the pathList
+ * @return an iterator for our list of input split paths
+ */
+ public Iterator<String> iterator() {
+ return new PathListIterator();
+ }
+
+ /**
+ * Iterator for path list that handles the locality and hash offsetting.
+ */
+ public class PathListIterator implements Iterator<String> {
+ /** the current iterator index */
+ private int currentIndex = 0;
+
+ /**
+ * Do we have more list to iterate upon?
+ * @return true if more path strings are available
+ */
+ @Override
+ public boolean hasNext() {
+ return currentIndex < pathList.size();
+ }
+
+ /** return the next pathList element
+ * @return the next input split path
+ */
+ @Override
+ public String next() {
+ return pathList.get((baseOffset + currentIndex++) % pathList.size());
+ }
+
+ /** Just a placeholder; should not do anything! */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not allowed.");
+ }
+ }
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Wed Aug 29 18:49:29 2012
@@ -328,8 +328,12 @@ public class TestBspBasic extends BspCas
when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
- LocalityInfoSorter lis = new LocalityInfoSorter(zk, testList, localHost);
- final List<String> resultList = lis.getPrioritizedLocalInputSplits();
+ LocalityInfoSorter lis =
+ new LocalityInfoSorter(zk, testList, localHost, 0);
+ final List<String> resultList = new ArrayList<String>();
+ for (String next : lis) {
+ resultList.add(next);
+ }
assertEquals(goodList, resultList);
}