You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/01/11 01:28:03 UTC

git commit: GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)

Updated Branches:
  refs/heads/trunk 9805d231a -> a5bc5bb35


GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a5bc5bb3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a5bc5bb3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a5bc5bb3

Branch: refs/heads/trunk
Commit: a5bc5bb35a84a2c61623da9269842985e7cc9da5
Parents: 9805d23
Author: Eli Reisman <er...@apache.org>
Authored: Thu Jan 10 16:16:58 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Thu Jan 10 16:16:58 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/worker/BspServiceWorker.java |    2 +-
 .../giraph/worker/EdgeInputSplitsCallable.java     |    4 +-
 .../worker/EdgeInputSplitsCallableFactory.java     |    3 +-
 .../giraph/worker/InputSplitPathOrganizer.java     |   30 +++++++++------
 .../apache/giraph/worker/InputSplitsCallable.java  |    5 ++-
 .../giraph/worker/InputSplitsCallableFactory.java  |    3 +-
 .../giraph/worker/VertexInputSplitsCallable.java   |    4 +-
 .../worker/VertexInputSplitsCallableFactory.java   |    3 +-
 .../test/java/org/apache/giraph/TestBspBasic.java  |   18 ++++----
 10 files changed, 46 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 6ec0579..d365bfa 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)
+
   GIRAPH-478: Bring back jar-with-deps for giraph-hcatalog (nitay)
 
   GIRAPH-474: Add an oprtion not to use direct byte buffers. (majakabiljo via ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f33fe58..31a4dc6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -264,7 +264,7 @@ public class BspServiceWorker<I extends WritableComparable,
     }
     for (int i = 0; i < numThreads; ++i) {
       Callable<VertexEdgeCount> inputSplitsCallable =
-          inputSplitsCallableFactory.newCallable();
+          inputSplitsCallableFactory.newCallable(i);
       threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 23e2ff7..7d40dfb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -75,6 +75,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    * @param bspServiceWorker service worker
    * @param inputSplitPathList List of the paths of the input splits
    * @param workerInfo This worker's info
+   * @param threadId Id of input split thread
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public EdgeInputSplitsCallable(
@@ -84,9 +85,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       List<String> inputSplitPathList,
       WorkerInfo workerInfo,
+      int threadId,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
-        inputSplitPathList, workerInfo, zooKeeperExt,
+        inputSplitPathList, workerInfo, threadId, zooKeeperExt,
         BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
         BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
         bspServiceWorker.getEdgeInputSplitsEvents());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 1a9a744..1adcd73 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -82,7 +82,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   }
 
   @Override
-  public InputSplitsCallable<I, V, E, M> newCallable() {
+  public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
     return new EdgeInputSplitsCallable<I, V, E, M>(
         context,
         graphState,
@@ -90,6 +90,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
         bspServiceWorker,
         inputSplitPathList,
         workerInfo,
+        threadId,
         zooKeeperExt);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
index b82da7d..21e59bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -17,17 +17,19 @@
  */
 package org.apache.giraph.worker;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Iterator;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
+import java.util.List;
 
 /**
  * Utility class to extract the list of InputSplits from the
@@ -57,12 +59,14 @@ public class InputSplitPathOrganizer implements Iterable<String> {
    * @param zkPathList the path to read from
    * @param hostName the worker's host name (for matching)
    * @param port the port number for this worker
+   * @param threadId id of the input split thread
    */
   public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
-    final String zkPathList, final String hostName, final int port)
+    final String zkPathList, final String hostName, final int port,
+    final int threadId)
     throws KeeperException, InterruptedException {
     this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
-        hostName, port);
+        hostName, port, threadId);
   }
 
   /**
@@ -72,16 +76,17 @@ public class InputSplitPathOrganizer implements Iterable<String> {
    * @param inputSplitPathList path of input splits to read from
    * @param hostName the worker's host name (for matching)
    * @param port the port number for this worker
+   * @param threadId id of the input split thread
    */
   public InputSplitPathOrganizer(
       final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
-      final String hostName, final int port)
+      final String hostName, final int port, final int threadId)
     throws KeeperException, InterruptedException {
     this.zooKeeper = zooKeeper;
     this.pathList = Lists.newArrayList(inputSplitPathList);
     this.hostName = hostName;
     this.baseOffset = 0; // set later after switching out local paths
-    prioritizeLocalInputSplits(port);
+    prioritizeLocalInputSplits(port, threadId);
   }
 
  /**
@@ -93,10 +98,11 @@ public class InputSplitPathOrganizer implements Iterable<String> {
   *
   * @param port the port number for hashing unique iteration indexes for all
   *             workers, even those sharing the same host node.
+  * @param threadId id of the input split thread
   */
-  private void prioritizeLocalInputSplits(final int port) {
+  private void prioritizeLocalInputSplits(final int port, final int threadId) {
     List<String> sortedList = new ArrayList<String>();
-    String hosts = null;
+    String hosts;
     for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
       final String path = iterator.next();
       try {
@@ -117,9 +123,9 @@ public class InputSplitPathOrganizer implements Iterable<String> {
     Collections.shuffle(sortedList);
     // determine the hash-based offset for this worker to iterate from
     // and place the local blocks into the list at that index, if any
-    final int temp = hostName.hashCode() + (19 * port);
+    final int hashOffset = Objects.hashCode(hostName, port, threadId);
     if (pathList.size() != 0) {
-      baseOffset = Math.abs(temp % pathList.size());
+      baseOffset = Math.abs(hashOffset % pathList.size());
     }
     // re-insert local paths at "adjusted index zero" for caller to iterate on
     pathList.addAll(baseOffset, sortedList);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index ec4780e..0f5cdd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -103,6 +103,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * @param bspServiceWorker service worker
    * @param inputSplitPathList List of the paths of the input splits
    * @param workerInfo This worker's info
+   * @param threadId Id of input split thread
    * @param zooKeeperExt Handle to ZooKeeperExt
    * @param inputSplitReservedNode Path to input split reserved
    * @param inputSplitFinishedNode Path to input split finsished
@@ -115,6 +116,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       List<String> inputSplitPathList,
       WorkerInfo workerInfo,
+      int threadId,
       ZooKeeperExt zooKeeperExt,
       String inputSplitReservedNode,
       String inputSplitFinishedNode,
@@ -130,7 +132,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
         null);
     try {
       splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
-          inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
+          inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(),
+          threadId);
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "InputSplitsCallable: KeeperException", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
index 9e8bc32..cdc6543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
@@ -34,7 +34,8 @@ public interface InputSplitsCallableFactory<I extends WritableComparable,
   /**
    * Return a newly-created {@link InputSplitsCallable}.
    *
+   * @param threadId Id of input split thread
    * @return A new {@link InputSplitsCallable}
    */
-  InputSplitsCallable<I, V, E, M> newCallable();
+  InputSplitsCallable<I, V, E, M> newCallable(int threadId);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 83c8b41..7522027 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -82,6 +82,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * @param bspServiceWorker service worker
    * @param inputSplitPathList List of the paths of the input splits
    * @param workerInfo This worker's info
+   * @param threadId Id of input split thread
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public VertexInputSplitsCallable(
@@ -91,9 +92,10 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       List<String> inputSplitPathList,
       WorkerInfo workerInfo,
+      int threadId,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
-        inputSplitPathList, workerInfo, zooKeeperExt,
+        inputSplitPathList, workerInfo, threadId, zooKeeperExt,
         BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
         BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
         bspServiceWorker.getVertexInputSplitsEvents());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index 4bec931..0d617dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -82,7 +82,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   }
 
   @Override
-  public InputSplitsCallable<I, V, E, M> newCallable() {
+  public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
     return new VertexInputSplitsCallable<I, V, E, M>(
         context,
         graphState,
@@ -90,6 +90,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
         bspServiceWorker,
         inputSplitPathList,
         workerInfo,
+        threadId,
         zooKeeperExt);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 91d536e..56ee5a9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -342,9 +342,9 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
   public void testInputSplitPathOrganizer()
     throws IOException, KeeperException, InterruptedException {
     final List<String> goodList = new ArrayList<String>();
-    Collections.addAll(goodList, "good", "bad", "ugly");
+    Collections.addAll(goodList, "local", "remote1", "remote2");
     final List<String> testList = new ArrayList<String>();
-    Collections.addAll(testList, "bad", "good", "ugly");
+    Collections.addAll(testList, "remote2", "local", "remote1");
     final String localHost = "node.LOCAL.com";
     final String testListName = "test_list_parent_znode";
     // build output just as we do to store hostlists in ZNODES
@@ -352,24 +352,24 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
     DataOutputStream dos = new DataOutputStream(baos);
     String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
     Text.writeString(dos, last);
-    byte[] LOCALITY_LAST = baos.toByteArray();
+    byte[] remote1 = baos.toByteArray();
     baos = new ByteArrayOutputStream();
     dos = new DataOutputStream(baos);
     String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
     Text.writeString(dos, middle);
-    byte[] LOCALITY_MIDDLE = baos.toByteArray();
+    byte[] remote2 = baos.toByteArray();
     baos = new ByteArrayOutputStream();
     dos = new DataOutputStream(baos);
     String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
     Text.writeString(dos, first);
-    byte[] LOCALITY_FIRST = baos.toByteArray();
+    byte[] local = baos.toByteArray();
     ZooKeeperExt zk = mock(ZooKeeperExt.class);
     when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList);
-    when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
-    when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
-    when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
+    when(zk.getData("remote1", false, null)).thenReturn(remote1);
+    when(zk.getData("remote2", false, null)).thenReturn(remote2);
+    when(zk.getData("local", false, null)).thenReturn(local);
     InputSplitPathOrganizer lis =
-      new InputSplitPathOrganizer(zk, testListName, localHost, 0);
+      new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0);
     final List<String> resultList = new ArrayList<String>();
     for (String next : lis) {
       resultList.add(next);