You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/01/11 06:18:36 UTC

git commit: GIRAPH-477: Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman)

Updated Branches:
  refs/heads/trunk 2b95451e1 -> e1a7f2905


GIRAPH-477: Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e1a7f290
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e1a7f290
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e1a7f290

Branch: refs/heads/trunk
Commit: e1a7f2905b7afc0d621f129a092b5c7fc14e97ab
Parents: 2b95451
Author: Eli Reisman <er...@apache.org>
Authored: Thu Jan 10 21:17:53 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Thu Jan 10 21:17:53 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/conf/GiraphConstants.java    |   14 +++++
 .../org/apache/giraph/master/BspServiceMaster.java |   40 ++++++++++-----
 .../giraph/worker/InputSplitPathOrganizer.java     |   41 +++++++++------
 .../apache/giraph/worker/InputSplitsCallable.java  |   14 ++++-
 .../test/java/org/apache/giraph/TestBspBasic.java  |    9 ++--
 6 files changed, 83 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 78f01db..6c2c5e6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman)
+
   GIRAPH-459: Group Vertex Mutations by Partition ID (claudio)
 
   GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 9acc50a..8e75e5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -419,6 +419,20 @@ public interface GiraphConstants {
    */
   long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
 
+  /**
+   * To minimize network usage when reading input splits,
+   * each worker can prioritize splits that reside on its host.
+   * This, however, comes at the cost of increased load on ZooKeeper.
+   * Hence, users with a lot of splits and input threads (or with
+   * configurations that can't exploit locality) may want to disable it.
+   */
+  String USE_INPUT_SPLIT_LOCALITY = "giraph.useInputSplitLocality";
+
+  /**
+   * Default is to prioritize local input splits.
+   */
+  boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
+
   /** Java opts passed to ZooKeeper startup */
   String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
   /** Default java opts passed to ZooKeeper startup */

http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 4483385..33f9f4a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -602,9 +602,13 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     ExecutorService taskExecutor =
         Executors.newFixedThreadPool(inputSplitThreadCount);
+    boolean writeLocations = getConfiguration().getBoolean(
+        GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
+        GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
     for (int i = 0; i < splitList.size(); ++i) {
       InputSplit inputSplit = splitList.get(i);
-      taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
+      taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
+          writeLocations));
     }
     taskExecutor.shutdown();
     ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
@@ -1821,6 +1825,8 @@ public class BspServiceMaster<I extends WritableComparable,
     private final String inputSplitsPath;
     /** Index of the input split */
     private final int index;
+    /** Whether to write locality information */
+    private final boolean writeLocations;
 
     /**
      * Constructor
@@ -1828,13 +1834,18 @@ public class BspServiceMaster<I extends WritableComparable,
      * @param inputSplit Input split which we are going to write
      * @param inputSplitsPath Input splits path
      * @param index Index of the input split
+     * @param writeLocations whether to write the input split's locations (to
+     *                       be used by workers for prioritizing local splits
+     *                       when reading)
      */
     public WriteInputSplit(InputSplit inputSplit,
                            String inputSplitsPath,
-                           int index) {
+                           int index,
+                           boolean writeLocations) {
       this.inputSplit = inputSplit;
       this.inputSplitsPath = inputSplitsPath;
       this.index = index;
+      this.writeLocations = writeLocations;
     }
 
     @Override
@@ -1846,19 +1857,22 @@ public class BspServiceMaster<I extends WritableComparable,
         DataOutput outputStream =
             new DataOutputStream(byteArrayOutputStream);
 
-        String[] splitLocations = inputSplit.getLocations();
-        StringBuilder locations = null;
-        if (splitLocations != null) {
-          int splitListLength =
-              Math.min(splitLocations.length, localityLimit);
-          locations = new StringBuilder();
-          for (String location : splitLocations) {
-            locations.append(location)
-                .append(--splitListLength > 0 ? "\t" : "");
+        if (writeLocations) {
+          String[] splitLocations = inputSplit.getLocations();
+          StringBuilder locations = null;
+          if (splitLocations != null) {
+            int splitListLength =
+                Math.min(splitLocations.length, localityLimit);
+            locations = new StringBuilder();
+            for (String location : splitLocations) {
+              locations.append(location)
+                  .append(--splitListLength > 0 ? "\t" : "");
+            }
           }
+          Text.writeString(outputStream,
+              locations == null ? "" : locations.toString());
         }
-        Text.writeString(outputStream,
-            locations == null ? "" : locations.toString());
+
         Text.writeString(outputStream,
             inputSplit.getClass().getName());
         ((Writable) inputSplit).write(outputStream);

http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
index 21e59bd..f5b054d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -60,13 +60,14 @@ public class InputSplitPathOrganizer implements Iterable<String> {
    * @param hostName the worker's host name (for matching)
    * @param port the port number for this worker
    * @param threadId id of the input split thread
+   * @param useLocality whether to prioritize local input splits
    */
   public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
     final String zkPathList, final String hostName, final int port,
-    final int threadId)
+    final int threadId, final boolean useLocality)
     throws KeeperException, InterruptedException {
     this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
-        hostName, port, threadId);
+        hostName, port, threadId, useLocality);
   }
 
   /**
@@ -77,30 +78,44 @@ public class InputSplitPathOrganizer implements Iterable<String> {
    * @param hostName the worker's host name (for matching)
    * @param port the port number for this worker
    * @param threadId id of the input split thread
+   * @param useLocality whether to prioritize local input splits
    */
   public InputSplitPathOrganizer(
       final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
-      final String hostName, final int port, final int threadId)
+      final String hostName, final int port, final int threadId,
+      final boolean useLocality)
     throws KeeperException, InterruptedException {
     this.zooKeeper = zooKeeper;
     this.pathList = Lists.newArrayList(inputSplitPathList);
     this.hostName = hostName;
-    this.baseOffset = 0; // set later after switching out local paths
-    prioritizeLocalInputSplits(port, threadId);
+    this.baseOffset = computeBaseOffset(port, threadId);
+    if (useLocality) {
+      prioritizeLocalInputSplits();
+    }
+  }
+
+  /**
+   * Compute base offset to start iterating from,
+   * in order to avoid collisions with other workers/threads.
+   *
+   * @param port the port number for this worker
+   * @param threadId id of the input split thread
+   * @return the offset to start iterating from
+   */
+  private int computeBaseOffset(int port, int threadId) {
+    return pathList.isEmpty() ? 0 :
+        Math.abs(Objects.hashCode(hostName, port, threadId) % pathList.size());
   }
 
- /**
+  /**
   * Re-order list of InputSplits so files local to this worker node's
   * disk are the first it will iterate over when attempting to claim
   * a split to read. This will increase locality of data reads with greater
   * probability as the % of total nodes in the cluster hosting data and workers
   * BOTH increase towards 100%. Replication increases our chances of a "hit."
   *
-  * @param port the port number for hashing unique iteration indexes for all
-  *             workers, even those sharing the same host node.
-  * @param threadId id of the input split thread
   */
-  private void prioritizeLocalInputSplits(final int port, final int threadId) {
+  private void prioritizeLocalInputSplits() {
     List<String> sortedList = new ArrayList<String>();
     String hosts;
     for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
@@ -121,12 +136,6 @@ public class InputSplitPathOrganizer implements Iterable<String> {
     }
     // shuffle the local blocks in case several workers exist on this host
     Collections.shuffle(sortedList);
-    // determine the hash-based offset for this worker to iterate from
-    // and place the local blocks into the list at that index, if any
-    final int hashOffset = Objects.hashCode(hostName, port, threadId);
-    if (pathList.size() != 0) {
-      baseOffset = Math.abs(hashOffset % pathList.size());
-    }
     // re-insert local paths at "adjusted index zero" for caller to iterate on
     pathList.addAll(baseOffset, sortedList);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 0f5cdd4..acd4e2d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -18,9 +18,10 @@
 
 package org.apache.giraph.worker;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.VertexEdgeCount;
@@ -92,6 +93,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   private final String inputSplitFinishedNode;
   /** Input split events. */
   private final InputSplitEvents inputSplitEvents;
+  /** Whether to prioritize local input splits. */
+  private final boolean useLocality;
 
   // CHECKSTYLE: stop ParameterNumberCheck
   /**
@@ -130,10 +133,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
         graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
         context, graphState.getGraphMapper(), workerClientRequestProcessor,
         null);
+    this.useLocality = configuration.getBoolean(
+        GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
+        GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
     try {
       splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
           inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(),
-          threadId);
+          threadId, this.useLocality);
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "InputSplitsCallable: KeeperException", e);
@@ -377,7 +383,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
 
     DataInputStream inputStream =
         new DataInputStream(new ByteArrayInputStream(splitList));
-    Text.readString(inputStream); // location data unused here, skip
+    if (useLocality) {
+      Text.readString(inputStream); // location data unused here, skip
+    }
     String inputSplitClass = Text.readString(inputStream);
     InputSplit inputSplit = (InputSplit)
         ReflectionUtils.newInstance(

http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 56ee5a9..987f51c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -341,8 +341,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
   @Test
   public void testInputSplitPathOrganizer()
     throws IOException, KeeperException, InterruptedException {
-    final List<String> goodList = new ArrayList<String>();
-    Collections.addAll(goodList, "local", "remote1", "remote2");
     final List<String> testList = new ArrayList<String>();
     Collections.addAll(testList, "remote2", "local", "remote1");
     final String localHost = "node.LOCAL.com";
@@ -364,17 +362,18 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
     Text.writeString(dos, first);
     byte[] local = baos.toByteArray();
     ZooKeeperExt zk = mock(ZooKeeperExt.class);
-    when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList);
+    when(zk.getChildrenExt(testListName, false, false, true)).
+        thenReturn(testList);
     when(zk.getData("remote1", false, null)).thenReturn(remote1);
     when(zk.getData("remote2", false, null)).thenReturn(remote2);
     when(zk.getData("local", false, null)).thenReturn(local);
     InputSplitPathOrganizer lis =
-      new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0);
+      new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0, true);
     final List<String> resultList = new ArrayList<String>();
     for (String next : lis) {
       resultList.add(next);
     }
-    assertEquals(goodList, resultList);
+    assertEquals("local", resultList.get(0));
   }
 
   /**