You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/29 20:49:30 UTC

svn commit: r1378681 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/BspServiceWorker.java src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java src/test/java/org/apache/giraph/TestBspBasic.java

Author: apresta
Date: Wed Aug 29 18:49:29 2012
New Revision: 1378681

URL: http://svn.apache.org/viewvc?rev=1378681&view=rev
Log:
GIRAPH-301: InputSplit Reservations are clumping, leaving many workers asleep while other process too many splits and get overloaded. (Eli Reisman via apresta)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 29 18:49:29 2012
@@ -2,6 +2,10 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-301: InputSplit Reservations are clumping, leaving many workers
+  asleep while other process too many splits and get overloaded.
+  (Eli Reisman via apresta)
+
   GIRAPH-313: Open Netty client and server on master. (majakabiljo via
   aching)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Aug 29 18:49:29 2012
@@ -204,6 +204,15 @@ public class BspServiceWorker<I extends 
    * Try to reserve an InputSplit for loading.  While InputSplits exists that
    * are not finished, wait until they are.
    *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change.
+   *
    * @return reserved InputSplit or null if no unfinished InputSplits exist
    * @throws KeeperException
    * @throws InterruptedException
@@ -217,27 +226,16 @@ public class BspServiceWorker<I extends 
       inputSplitCount = inputSplitPathList.size();
     }
     LocalityInfoSorter localitySorter = new LocalityInfoSorter(
-      getZkExt(), inputSplitPathList, getHostname());
-    inputSplitPathList =
-      localitySorter.getPrioritizedLocalInputSplits();
+      getZkExt(), inputSplitPathList, getHostname(), getWorkerInfo().getPort());
     String reservedInputSplitPath = null;
     Stat reservedStat = null;
     final Mapper<?, ?, ?, ?>.Context context = getContext();
     while (true) {
-      int finishedInputSplits = 0;
-      for (int i = 0; i < inputSplitPathList.size(); ++i) {
+      int reservedInputSplits = 0;
+      for (String nextSplitToClaim : localitySorter) {
         context.progress();
-        String tmpInputSplitFinishedPath =
-            inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
-        reservedStat =
-            getZkExt().exists(tmpInputSplitFinishedPath, true);
-        if (reservedStat != null) {
-          ++finishedInputSplits;
-          continue;
-        }
-
         String tmpInputSplitReservedPath =
-            inputSplitPathList.get(i) + INPUT_SPLIT_RESERVED_NODE;
+            nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
         reservedStat =
             getZkExt().exists(tmpInputSplitReservedPath, true);
         if (reservedStat == null) {
@@ -248,16 +246,16 @@ public class BspServiceWorker<I extends 
                 Ids.OPEN_ACL_UNSAFE,
                 CreateMode.EPHEMERAL,
                 false);
-            reservedInputSplitPath = inputSplitPathList.get(i);
+            reservedInputSplitPath = nextSplitToClaim;
             if (LOG.isInfoEnabled()) {
               float percentFinished =
-                  finishedInputSplits * 100.0f /
+                  reservedInputSplits * 100.0f /
                   inputSplitPathList.size();
               LOG.info("reserveInputSplit: Reserved input " +
                   "split path " + reservedInputSplitPath +
                   ", overall roughly " +
                   + percentFinished +
-                  "% input splits finished");
+                  "% input splits reserved");
             }
             return reservedInputSplitPath;
           } catch (KeeperException.NodeExistsException e) {
@@ -272,15 +270,17 @@ public class BspServiceWorker<I extends 
                 "reserveInputSplit: InterruptedException " +
                     "on reserve", e);
           }
+        } else {
+          ++reservedInputSplits;
         }
       }
       if (LOG.isInfoEnabled()) {
         LOG.info("reserveInputSplit: reservedPath = " +
-            reservedInputSplitPath + ", " + finishedInputSplits +
+            reservedInputSplitPath + ", " + reservedInputSplits +
             " of " + inputSplitPathList.size() +
             " InputSplits are finished.");
       }
-      if (finishedInputSplits == inputSplitPathList.size()) {
+      if (reservedInputSplits == inputSplitPathList.size()) {
         transferRegulator = null; // don't need this anymore
         return null;
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java Wed Aug 29 18:49:29 2012
@@ -21,7 +21,9 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Iterator;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
@@ -29,45 +31,62 @@ import org.apache.zookeeper.KeeperExcept
 /**
  * Utility class to extract InputSplit locality information
  * from znodes and to sort the InputSplit list for the worker
- * owning this object to select splits from.
+ * owning this object to favor local data block selection.
+ *
+ * This class also provides a hash-rotated index by which workers
+ * must index into their list of InputSplits, This will be especially
+ * helpful to those who do not find local blocks to read, and must
+ * contend with other workers for non-local splits in the list.
+ *
+ * Searching for splits using ZK reads is slowed by the fact that
+ * after each ZK write (to mark a split reserved or finished) the
+ * ZK quorum must be sync'd before pending read requests can be
+ * fulfilled. During InputSplit claiming, the writes are frequent on
+ * both reserved and finished node trees; the aim is to cut down on
+ * the number of ZK reads workers perform to locate an unclaimed node.
  */
-public class LocalityInfoSorter {
+public class LocalityInfoSorter implements Iterable<String> {
   /** The worker's local ZooKeeperExt ref */
   private final ZooKeeperExt zooKeeper;
   /** The List of InputSplit znode paths */
   private final List<String> pathList;
   /** The worker's hostname */
   private final String hostName;
+  /** The adjusted base offset by which to iterate on the path list */
+  private final int baseOffset;
 
   /**
    * Constructor
    * @param zooKeeper the worker's ZkExt
    * @param pathList the path to read from
    * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
    */
-  public LocalityInfoSorter(ZooKeeperExt zooKeeper, List<String> pathList,
-    String hostName) {
+  public LocalityInfoSorter(final ZooKeeperExt zooKeeper,
+    List<String> pathList, final String hostName, final int port) {
     this.zooKeeper = zooKeeper;
     this.pathList = pathList;
     this.hostName = hostName;
+    // determine the hash-based offset for this worker to iterate from
+    // and place the local blocks into the list at that index, if any
+    final int temp = hostName.hashCode() + (19 * port);
+    baseOffset =
+      Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
+    prioritizeLocalInputSplits();
   }
 
-  /**
-   * Re-order list of InputSplits so files local to this worker node's
-   * disk are the first it will iterate over when attempting to claim
-   * a split to read. This will increase locality of data reads with greater
-   * probability as the % of total nodes in the cluster hosting data and workers
-   * BOTH increase towards 100%. Replication increases our chances of a "hit."
-   *
-   * @return the pathList, with host-local splits sorted to the front.
-   */
-  public List<String> getPrioritizedLocalInputSplits() {
+ /**
+  * Re-order list of InputSplits so files local to this worker node's
+  * disk are the first it will iterate over when attempting to claim
+  * a split to read. This will increase locality of data reads with greater
+  * probability as the % of total nodes in the cluster hosting data and workers
+  * BOTH increase towards 100%. Replication increases our chances of a "hit."
+  */
+  private void prioritizeLocalInputSplits() {
     List<String> sortedList = new ArrayList<String>();
-    boolean prioritize;
     String hosts = null;
     for (int index = 0; index < pathList.size(); ++index) {
       final String path = pathList.get(index);
-      prioritize = false;
       try {
         hosts = getLocationsFromZkInputSplitData(path);
       } catch (IOException ioe) {
@@ -77,10 +96,14 @@ public class LocalityInfoSorter {
       } catch (InterruptedException ie) {
         hosts = null;
       }
-      prioritize = hosts == null ? false : hosts.contains(hostName);
-      sortedList.add(prioritize ? 0 : index, path);
+      if (hosts != null && hosts.contains(hostName)) {
+        sortedList.add(path); // collect the local block
+        pathList.remove(index); // remove local block from list
+      }
     }
-    return sortedList;
+    // shuffle the local blocks in case several workers exist on this host
+    Collections.shuffle(sortedList);
+    pathList.addAll(baseOffset, sortedList); // re-insert local blocks
   }
 
   /**
@@ -88,7 +111,7 @@ public class LocalityInfoSorter {
    *
    * @param zkSplitPath the input split path to attempt to read
    * ZNode locality data from for this InputSplit.
-   * @return an array of String hostnames from ZNode data, or throws
+   * @return a String of hostnames from ZNode data, or throws
    */
   private String getLocationsFromZkInputSplitData(String zkSplitPath)
     throws IOException, KeeperException, InterruptedException {
@@ -98,4 +121,43 @@ public class LocalityInfoSorter {
     // only read the "first" entry in the znode data, the locations
     return Text.readString(inputStream);
   }
+
+  /**
+   * Iterator for the pathList
+   * @return an iterator for our list of input split paths
+   */
+  public Iterator<String> iterator() {
+    return new PathListIterator();
+  }
+
+  /**
+   * Iterator for path list that handles the locality and hash offsetting.
+   */
+  public class PathListIterator implements Iterator<String> {
+    /** the current iterator index */
+    private int currentIndex = 0;
+
+    /**
+     *  Do we have more list to iterate upon?
+     *  @return true if more path strings are available
+     */
+    @Override
+    public boolean hasNext() {
+      return currentIndex < pathList.size();
+    }
+
+    /** return the next pathList element
+     * @return the next input split path
+     */
+    @Override
+    public String next() {
+      return pathList.get((baseOffset + currentIndex++) % pathList.size());
+    }
+
+    /** Just a placeholder; should not do anything! */
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not allowed.");
+    }
+  }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1378681&r1=1378680&r2=1378681&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Wed Aug 29 18:49:29 2012
@@ -328,8 +328,12 @@ public class TestBspBasic extends BspCas
     when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
     when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
     when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
-    LocalityInfoSorter lis = new LocalityInfoSorter(zk, testList, localHost);
-    final List<String> resultList = lis.getPrioritizedLocalInputSplits();
+    LocalityInfoSorter lis =
+      new LocalityInfoSorter(zk, testList, localHost, 0);
+    final List<String> resultList = new ArrayList<String>();
+    for (String next : lis) {
+      resultList.add(next);
+    }
     assertEquals(goodList, resultList);
   }