You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/30 14:24:15 UTC
svn commit: r1378907 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
Author: apresta
Date: Thu Aug 30 12:24:15 2012
New Revision: 1378907
URL: http://svn.apache.org/viewvc?rev=1378907&view=rev
Log:
GIRAPH-318: New Iterator in LocalityInfoSorter is not working. (Eli Reisman via apresta)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1378907&r1=1378906&r2=1378907&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug 30 12:24:15 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-318: New Iterator in LocalityInfoSorter is not working.
+ (Eli Reisman via apresta)
+
GIRAPH-317: Add subpackages to comm (Maja Kabiljo via ereisman)
GIRAPH-301: InputSplit Reservations are clumping, leaving many workers
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java?rev=1378907&r1=1378906&r2=1378907&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java Thu Aug 30 12:24:15 2012
@@ -53,7 +53,7 @@ public class LocalityInfoSorter implemen
/** The worker's hostname */
private final String hostName;
/** The adjusted base offset by which to iterate on the path list */
- private final int baseOffset;
+ private int baseOffset;
/**
* Constructor
@@ -67,12 +67,8 @@ public class LocalityInfoSorter implemen
this.zooKeeper = zooKeeper;
this.pathList = pathList;
this.hostName = hostName;
- // determine the hash-based offset for this worker to iterate from
- // and place the local blocks into the list at that index, if any
- final int temp = hostName.hashCode() + (19 * port);
- baseOffset =
- Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
- prioritizeLocalInputSplits();
+ this.baseOffset = 0; // set later after switching out local paths
+ prioritizeLocalInputSplits(port);
}
/**
@@ -81,12 +77,14 @@ public class LocalityInfoSorter implemen
* a split to read. This will increase locality of data reads with greater
* probability as the % of total nodes in the cluster hosting data and workers
* BOTH increase towards 100%. Replication increases our chances of a "hit."
+ * @param port the port number for hashing unique iteration indexes for all
+ * workers, even those sharing the same host node.
*/
- private void prioritizeLocalInputSplits() {
+ private void prioritizeLocalInputSplits(final int port) {
List<String> sortedList = new ArrayList<String>();
String hosts = null;
- for (int index = 0; index < pathList.size(); ++index) {
- final String path = pathList.get(index);
+ for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+ final String path = iterator.next();
try {
hosts = getLocationsFromZkInputSplitData(path);
} catch (IOException ioe) {
@@ -98,12 +96,18 @@ public class LocalityInfoSorter implemen
}
if (hosts != null && hosts.contains(hostName)) {
sortedList.add(path); // collect the local block
- pathList.remove(index); // remove local block from list
+ iterator.remove(); // remove local block from list
}
}
// shuffle the local blocks in case several workers exist on this host
Collections.shuffle(sortedList);
- pathList.addAll(baseOffset, sortedList); // re-insert local blocks
+ // determine the hash-based offset for this worker to iterate from
+ // and place the local blocks into the list at that index, if any
+ final int temp = hostName.hashCode() + (19 * port);
+ baseOffset =
+ Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
+ // re-insert local paths at "adjusted index zero" for caller to iterate on
+ pathList.addAll(baseOffset, sortedList);
}
/**