You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/30 14:24:15 UTC

svn commit: r1378907 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java

Author: apresta
Date: Thu Aug 30 12:24:15 2012
New Revision: 1378907

URL: http://svn.apache.org/viewvc?rev=1378907&view=rev
Log:
GIRAPH-318: New Iterator in LocalityInfoSorter is not working. (Eli Reisman via apresta)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1378907&r1=1378906&r2=1378907&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug 30 12:24:15 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-318: New Iterator in LocalityInfoSorter is not working.
+  (Eli Reisman via apresta)
+
   GIRAPH-317: Add subpackages to comm (Maja Kabiljo via ereisman)
 
   GIRAPH-301: InputSplit Reservations are clumping, leaving many workers

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java?rev=1378907&r1=1378906&r2=1378907&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java Thu Aug 30 12:24:15 2012
@@ -53,7 +53,7 @@ public class LocalityInfoSorter implemen
   /** The worker's hostname */
   private final String hostName;
   /** The adjusted base offset by which to iterate on the path list */
-  private final int baseOffset;
+  private int baseOffset;
 
   /**
    * Constructor
@@ -67,12 +67,8 @@ public class LocalityInfoSorter implemen
     this.zooKeeper = zooKeeper;
     this.pathList = pathList;
     this.hostName = hostName;
-    // determine the hash-based offset for this worker to iterate from
-    // and place the local blocks into the list at that index, if any
-    final int temp = hostName.hashCode() + (19 * port);
-    baseOffset =
-      Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
-    prioritizeLocalInputSplits();
+    this.baseOffset = 0; // set later after switching out local paths
+    prioritizeLocalInputSplits(port);
   }
 
  /**
@@ -81,12 +77,14 @@ public class LocalityInfoSorter implemen
   * a split to read. This will increase locality of data reads with greater
   * probability as the % of total nodes in the cluster hosting data and workers
   * BOTH increase towards 100%. Replication increases our chances of a "hit."
+  * @param port the port number for hashing unique iteration indexes for all
+  *             workers, even those sharing the same host node.
   */
-  private void prioritizeLocalInputSplits() {
+  private void prioritizeLocalInputSplits(final int port) {
     List<String> sortedList = new ArrayList<String>();
     String hosts = null;
-    for (int index = 0; index < pathList.size(); ++index) {
-      final String path = pathList.get(index);
+    for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+      final String path = iterator.next();
       try {
         hosts = getLocationsFromZkInputSplitData(path);
       } catch (IOException ioe) {
@@ -98,12 +96,18 @@ public class LocalityInfoSorter implemen
       }
       if (hosts != null && hosts.contains(hostName)) {
         sortedList.add(path); // collect the local block
-        pathList.remove(index); // remove local block from list
+        iterator.remove(); // remove local block from list
       }
     }
     // shuffle the local blocks in case several workers exist on this host
     Collections.shuffle(sortedList);
-    pathList.addAll(baseOffset, sortedList); // re-insert local blocks
+    // determine the hash-based offset for this worker to iterate from
+    // and place the local blocks into the list at that index, if any
+    final int temp = hostName.hashCode() + (19 * port);
+    baseOffset =
+      Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
+    // re-insert local paths at "adjusted index zero" for caller to iterate on
+    pathList.addAll(baseOffset, sortedList);
   }
 
   /**