You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/01/11 01:28:03 UTC
git commit: GIRAPH-473: InputSplitPathOrganizer should be aware of
multiple threads (apresta via ereisman)
Updated Branches:
refs/heads/trunk 9805d231a -> a5bc5bb35
GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a5bc5bb3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a5bc5bb3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a5bc5bb3
Branch: refs/heads/trunk
Commit: a5bc5bb35a84a2c61623da9269842985e7cc9da5
Parents: 9805d23
Author: Eli Reisman <er...@apache.org>
Authored: Thu Jan 10 16:16:58 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Thu Jan 10 16:16:58 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../org/apache/giraph/worker/BspServiceWorker.java | 2 +-
.../giraph/worker/EdgeInputSplitsCallable.java | 4 +-
.../worker/EdgeInputSplitsCallableFactory.java | 3 +-
.../giraph/worker/InputSplitPathOrganizer.java | 30 +++++++++------
.../apache/giraph/worker/InputSplitsCallable.java | 5 ++-
.../giraph/worker/InputSplitsCallableFactory.java | 3 +-
.../giraph/worker/VertexInputSplitsCallable.java | 4 +-
.../worker/VertexInputSplitsCallableFactory.java | 3 +-
.../test/java/org/apache/giraph/TestBspBasic.java | 18 ++++----
10 files changed, 46 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 6ec0579..d365bfa 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)
+
GIRAPH-478: Bring back jar-with-deps for giraph-hcatalog (nitay)
GIRAPH-474: Add an oprtion not to use direct byte buffers. (majakabiljo via ereisman)
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f33fe58..31a4dc6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -264,7 +264,7 @@ public class BspServiceWorker<I extends WritableComparable,
}
for (int i = 0; i < numThreads; ++i) {
Callable<VertexEdgeCount> inputSplitsCallable =
- inputSplitsCallableFactory.newCallable();
+ inputSplitsCallableFactory.newCallable(i);
threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 23e2ff7..7d40dfb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -75,6 +75,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
* @param bspServiceWorker service worker
* @param inputSplitPathList List of the paths of the input splits
* @param workerInfo This worker's info
+ * @param threadId Id of input split thread
* @param zooKeeperExt Handle to ZooKeeperExt
*/
public EdgeInputSplitsCallable(
@@ -84,9 +85,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
BspServiceWorker<I, V, E, M> bspServiceWorker,
List<String> inputSplitPathList,
WorkerInfo workerInfo,
+ int threadId,
ZooKeeperExt zooKeeperExt) {
super(context, graphState, configuration, bspServiceWorker,
- inputSplitPathList, workerInfo, zooKeeperExt,
+ inputSplitPathList, workerInfo, threadId, zooKeeperExt,
BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
bspServiceWorker.getEdgeInputSplitsEvents());
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 1a9a744..1adcd73 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -82,7 +82,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
}
@Override
- public InputSplitsCallable<I, V, E, M> newCallable() {
+ public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
return new EdgeInputSplitsCallable<I, V, E, M>(
context,
graphState,
@@ -90,6 +90,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
bspServiceWorker,
inputSplitPathList,
workerInfo,
+ threadId,
zooKeeperExt);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
index b82da7d..21e59bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -17,17 +17,19 @@
*/
package org.apache.giraph.worker;
+import com.google.common.base.Objects;
import com.google.common.collect.Lists;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Iterator;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
+import java.util.List;
/**
* Utility class to extract the list of InputSplits from the
@@ -57,12 +59,14 @@ public class InputSplitPathOrganizer implements Iterable<String> {
* @param zkPathList the path to read from
* @param hostName the worker's host name (for matching)
* @param port the port number for this worker
+ * @param threadId id of the input split thread
*/
public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
- final String zkPathList, final String hostName, final int port)
+ final String zkPathList, final String hostName, final int port,
+ final int threadId)
throws KeeperException, InterruptedException {
this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
- hostName, port);
+ hostName, port, threadId);
}
/**
@@ -72,16 +76,17 @@ public class InputSplitPathOrganizer implements Iterable<String> {
* @param inputSplitPathList path of input splits to read from
* @param hostName the worker's host name (for matching)
* @param port the port number for this worker
+ * @param threadId id of the input split thread
*/
public InputSplitPathOrganizer(
final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
- final String hostName, final int port)
+ final String hostName, final int port, final int threadId)
throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
this.pathList = Lists.newArrayList(inputSplitPathList);
this.hostName = hostName;
this.baseOffset = 0; // set later after switching out local paths
- prioritizeLocalInputSplits(port);
+ prioritizeLocalInputSplits(port, threadId);
}
/**
@@ -93,10 +98,11 @@ public class InputSplitPathOrganizer implements Iterable<String> {
*
* @param port the port number for hashing unique iteration indexes for all
* workers, even those sharing the same host node.
+ * @param threadId id of the input split thread
*/
- private void prioritizeLocalInputSplits(final int port) {
+ private void prioritizeLocalInputSplits(final int port, final int threadId) {
List<String> sortedList = new ArrayList<String>();
- String hosts = null;
+ String hosts;
for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
final String path = iterator.next();
try {
@@ -117,9 +123,9 @@ public class InputSplitPathOrganizer implements Iterable<String> {
Collections.shuffle(sortedList);
// determine the hash-based offset for this worker to iterate from
// and place the local blocks into the list at that index, if any
- final int temp = hostName.hashCode() + (19 * port);
+ final int hashOffset = Objects.hashCode(hostName, port, threadId);
if (pathList.size() != 0) {
- baseOffset = Math.abs(temp % pathList.size());
+ baseOffset = Math.abs(hashOffset % pathList.size());
}
// re-insert local paths at "adjusted index zero" for caller to iterate on
pathList.addAll(baseOffset, sortedList);
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index ec4780e..0f5cdd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -103,6 +103,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* @param bspServiceWorker service worker
* @param inputSplitPathList List of the paths of the input splits
* @param workerInfo This worker's info
+ * @param threadId Id of input split thread
* @param zooKeeperExt Handle to ZooKeeperExt
* @param inputSplitReservedNode Path to input split reserved
* @param inputSplitFinishedNode Path to input split finsished
@@ -115,6 +116,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
BspServiceWorker<I, V, E, M> bspServiceWorker,
List<String> inputSplitPathList,
WorkerInfo workerInfo,
+ int threadId,
ZooKeeperExt zooKeeperExt,
String inputSplitReservedNode,
String inputSplitFinishedNode,
@@ -130,7 +132,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
null);
try {
splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
- inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
+ inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(),
+ threadId);
} catch (KeeperException e) {
throw new IllegalStateException(
"InputSplitsCallable: KeeperException", e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
index 9e8bc32..cdc6543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
@@ -34,7 +34,8 @@ public interface InputSplitsCallableFactory<I extends WritableComparable,
/**
* Return a newly-created {@link InputSplitsCallable}.
*
+ * @param threadId Id of input split thread
* @return A new {@link InputSplitsCallable}
*/
- InputSplitsCallable<I, V, E, M> newCallable();
+ InputSplitsCallable<I, V, E, M> newCallable(int threadId);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 83c8b41..7522027 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -82,6 +82,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
* @param bspServiceWorker service worker
* @param inputSplitPathList List of the paths of the input splits
* @param workerInfo This worker's info
+ * @param threadId Id of input split thread
* @param zooKeeperExt Handle to ZooKeeperExt
*/
public VertexInputSplitsCallable(
@@ -91,9 +92,10 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
BspServiceWorker<I, V, E, M> bspServiceWorker,
List<String> inputSplitPathList,
WorkerInfo workerInfo,
+ int threadId,
ZooKeeperExt zooKeeperExt) {
super(context, graphState, configuration, bspServiceWorker,
- inputSplitPathList, workerInfo, zooKeeperExt,
+ inputSplitPathList, workerInfo, threadId, zooKeeperExt,
BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
bspServiceWorker.getVertexInputSplitsEvents());
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index 4bec931..0d617dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -82,7 +82,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
}
@Override
- public InputSplitsCallable<I, V, E, M> newCallable() {
+ public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
return new VertexInputSplitsCallable<I, V, E, M>(
context,
graphState,
@@ -90,6 +90,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
bspServiceWorker,
inputSplitPathList,
workerInfo,
+ threadId,
zooKeeperExt);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 91d536e..56ee5a9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -342,9 +342,9 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
public void testInputSplitPathOrganizer()
throws IOException, KeeperException, InterruptedException {
final List<String> goodList = new ArrayList<String>();
- Collections.addAll(goodList, "good", "bad", "ugly");
+ Collections.addAll(goodList, "local", "remote1", "remote2");
final List<String> testList = new ArrayList<String>();
- Collections.addAll(testList, "bad", "good", "ugly");
+ Collections.addAll(testList, "remote2", "local", "remote1");
final String localHost = "node.LOCAL.com";
final String testListName = "test_list_parent_znode";
// build output just as we do to store hostlists in ZNODES
@@ -352,24 +352,24 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
DataOutputStream dos = new DataOutputStream(baos);
String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
Text.writeString(dos, last);
- byte[] LOCALITY_LAST = baos.toByteArray();
+ byte[] remote1 = baos.toByteArray();
baos = new ByteArrayOutputStream();
dos = new DataOutputStream(baos);
String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
Text.writeString(dos, middle);
- byte[] LOCALITY_MIDDLE = baos.toByteArray();
+ byte[] remote2 = baos.toByteArray();
baos = new ByteArrayOutputStream();
dos = new DataOutputStream(baos);
String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
Text.writeString(dos, first);
- byte[] LOCALITY_FIRST = baos.toByteArray();
+ byte[] local = baos.toByteArray();
ZooKeeperExt zk = mock(ZooKeeperExt.class);
when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList);
- when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
- when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
- when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
+ when(zk.getData("remote1", false, null)).thenReturn(remote1);
+ when(zk.getData("remote2", false, null)).thenReturn(remote2);
+ when(zk.getData("local", false, null)).thenReturn(local);
InputSplitPathOrganizer lis =
- new InputSplitPathOrganizer(zk, testListName, localHost, 0);
+ new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0);
final List<String> resultList = new ArrayList<String>();
for (String next : lis) {
resultList.add(next);