You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2013/01/11 06:18:36 UTC
git commit: GIRAPH-477: Fetching locality info in
InputSplitPathOrganizer causes jobs to hang (apresta via ereisman)
Updated Branches:
refs/heads/trunk 2b95451e1 -> e1a7f2905
GIRAPH-477: Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e1a7f290
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e1a7f290
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e1a7f290
Branch: refs/heads/trunk
Commit: e1a7f2905b7afc0d621f129a092b5c7fc14e97ab
Parents: 2b95451
Author: Eli Reisman <er...@apache.org>
Authored: Thu Jan 10 21:17:53 2013 -0800
Committer: Eli Reisman <er...@apache.org>
Committed: Thu Jan 10 21:17:53 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../org/apache/giraph/conf/GiraphConstants.java | 14 +++++
.../org/apache/giraph/master/BspServiceMaster.java | 40 ++++++++++-----
.../giraph/worker/InputSplitPathOrganizer.java | 41 +++++++++------
.../apache/giraph/worker/InputSplitsCallable.java | 14 ++++-
.../test/java/org/apache/giraph/TestBspBasic.java | 9 ++--
6 files changed, 83 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 78f01db..6c2c5e6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman)
+
GIRAPH-459: Group Vertex Mutations by Partition ID (claudio)
GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman)
http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 9acc50a..8e75e5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -419,6 +419,20 @@ public interface GiraphConstants {
*/
long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+ /**
+ * To minimize network usage when reading input splits,
+ * each worker can prioritize splits that reside on its host.
+ * This, however, comes at the cost of increased load on ZooKeeper.
+ * Hence, users with a lot of splits and input threads (or with
+ * configurations that can't exploit locality) may want to disable it.
+ */
+ String USE_INPUT_SPLIT_LOCALITY = "giraph.useInputSplitLocality";
+
+ /**
+ * Default is to prioritize local input splits.
+ */
+ boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
+
/** Java opts passed to ZooKeeper startup */
String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
/** Default java opts passed to ZooKeeper startup */
http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 4483385..33f9f4a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -602,9 +602,13 @@ public class BspServiceMaster<I extends WritableComparable,
}
ExecutorService taskExecutor =
Executors.newFixedThreadPool(inputSplitThreadCount);
+ boolean writeLocations = getConfiguration().getBoolean(
+ GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
+ GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
for (int i = 0; i < splitList.size(); ++i) {
InputSplit inputSplit = splitList.get(i);
- taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
+ taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
+ writeLocations));
}
taskExecutor.shutdown();
ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
@@ -1821,6 +1825,8 @@ public class BspServiceMaster<I extends WritableComparable,
private final String inputSplitsPath;
/** Index of the input split */
private final int index;
+ /** Whether to write locality information */
+ private final boolean writeLocations;
/**
* Constructor
@@ -1828,13 +1834,18 @@ public class BspServiceMaster<I extends WritableComparable,
* @param inputSplit Input split which we are going to write
* @param inputSplitsPath Input splits path
* @param index Index of the input split
+ * @param writeLocations whether to write the input split's locations (to
+ * be used by workers for prioritizing local splits
+ * when reading)
*/
public WriteInputSplit(InputSplit inputSplit,
String inputSplitsPath,
- int index) {
+ int index,
+ boolean writeLocations) {
this.inputSplit = inputSplit;
this.inputSplitsPath = inputSplitsPath;
this.index = index;
+ this.writeLocations = writeLocations;
}
@Override
@@ -1846,19 +1857,22 @@ public class BspServiceMaster<I extends WritableComparable,
DataOutput outputStream =
new DataOutputStream(byteArrayOutputStream);
- String[] splitLocations = inputSplit.getLocations();
- StringBuilder locations = null;
- if (splitLocations != null) {
- int splitListLength =
- Math.min(splitLocations.length, localityLimit);
- locations = new StringBuilder();
- for (String location : splitLocations) {
- locations.append(location)
- .append(--splitListLength > 0 ? "\t" : "");
+ if (writeLocations) {
+ String[] splitLocations = inputSplit.getLocations();
+ StringBuilder locations = null;
+ if (splitLocations != null) {
+ int splitListLength =
+ Math.min(splitLocations.length, localityLimit);
+ locations = new StringBuilder();
+ for (String location : splitLocations) {
+ locations.append(location)
+ .append(--splitListLength > 0 ? "\t" : "");
+ }
}
+ Text.writeString(outputStream,
+ locations == null ? "" : locations.toString());
}
- Text.writeString(outputStream,
- locations == null ? "" : locations.toString());
+
Text.writeString(outputStream,
inputSplit.getClass().getName());
((Writable) inputSplit).write(outputStream);
http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
index 21e59bd..f5b054d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -60,13 +60,14 @@ public class InputSplitPathOrganizer implements Iterable<String> {
* @param hostName the worker's host name (for matching)
* @param port the port number for this worker
* @param threadId id of the input split thread
+ * @param useLocality whether to prioritize local input splits
*/
public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
final String zkPathList, final String hostName, final int port,
- final int threadId)
+ final int threadId, final boolean useLocality)
throws KeeperException, InterruptedException {
this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
- hostName, port, threadId);
+ hostName, port, threadId, useLocality);
}
/**
@@ -77,30 +78,44 @@ public class InputSplitPathOrganizer implements Iterable<String> {
* @param hostName the worker's host name (for matching)
* @param port the port number for this worker
* @param threadId id of the input split thread
+ * @param useLocality whether to prioritize local input splits
*/
public InputSplitPathOrganizer(
final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
- final String hostName, final int port, final int threadId)
+ final String hostName, final int port, final int threadId,
+ final boolean useLocality)
throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
this.pathList = Lists.newArrayList(inputSplitPathList);
this.hostName = hostName;
- this.baseOffset = 0; // set later after switching out local paths
- prioritizeLocalInputSplits(port, threadId);
+ this.baseOffset = computeBaseOffset(port, threadId);
+ if (useLocality) {
+ prioritizeLocalInputSplits();
+ }
+ }
+
+ /**
+ * Compute base offset to start iterating from,
+ * in order to avoid collisions with other workers/threads.
+ *
+ * @param port the port number for this worker
+ * @param threadId id of the input split thread
+ * @return the offset to start iterating from
+ */
+ private int computeBaseOffset(int port, int threadId) {
+ return pathList.isEmpty() ? 0 :
+ Math.abs(Objects.hashCode(hostName, port, threadId) % pathList.size());
}
- /**
+ /**
* Re-order list of InputSplits so files local to this worker node's
* disk are the first it will iterate over when attempting to claim
* a split to read. This will increase locality of data reads with greater
* probability as the % of total nodes in the cluster hosting data and workers
* BOTH increase towards 100%. Replication increases our chances of a "hit."
*
- * @param port the port number for hashing unique iteration indexes for all
- * workers, even those sharing the same host node.
- * @param threadId id of the input split thread
*/
- private void prioritizeLocalInputSplits(final int port, final int threadId) {
+ private void prioritizeLocalInputSplits() {
List<String> sortedList = new ArrayList<String>();
String hosts;
for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
@@ -121,12 +136,6 @@ public class InputSplitPathOrganizer implements Iterable<String> {
}
// shuffle the local blocks in case several workers exist on this host
Collections.shuffle(sortedList);
- // determine the hash-based offset for this worker to iterate from
- // and place the local blocks into the list at that index, if any
- final int hashOffset = Objects.hashCode(hostName, port, threadId);
- if (pathList.size() != 0) {
- baseOffset = Math.abs(hashOffset % pathList.size());
- }
// re-insert local paths at "adjusted index zero" for caller to iterate on
pathList.addAll(baseOffset, sortedList);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 0f5cdd4..acd4e2d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -18,9 +18,10 @@
package org.apache.giraph.worker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.VertexEdgeCount;
@@ -92,6 +93,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
private final String inputSplitFinishedNode;
/** Input split events. */
private final InputSplitEvents inputSplitEvents;
+ /** Whether to prioritize local input splits. */
+ private final boolean useLocality;
// CHECKSTYLE: stop ParameterNumberCheck
/**
@@ -130,10 +133,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
context, graphState.getGraphMapper(), workerClientRequestProcessor,
null);
+ this.useLocality = configuration.getBoolean(
+ GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
+ GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
try {
splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(),
- threadId);
+ threadId, this.useLocality);
} catch (KeeperException e) {
throw new IllegalStateException(
"InputSplitsCallable: KeeperException", e);
@@ -377,7 +383,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
DataInputStream inputStream =
new DataInputStream(new ByteArrayInputStream(splitList));
- Text.readString(inputStream); // location data unused here, skip
+ if (useLocality) {
+ Text.readString(inputStream); // location data unused here, skip
+ }
String inputSplitClass = Text.readString(inputStream);
InputSplit inputSplit = (InputSplit)
ReflectionUtils.newInstance(
http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 56ee5a9..987f51c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -341,8 +341,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
@Test
public void testInputSplitPathOrganizer()
throws IOException, KeeperException, InterruptedException {
- final List<String> goodList = new ArrayList<String>();
- Collections.addAll(goodList, "local", "remote1", "remote2");
final List<String> testList = new ArrayList<String>();
Collections.addAll(testList, "remote2", "local", "remote1");
final String localHost = "node.LOCAL.com";
@@ -364,17 +362,18 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
Text.writeString(dos, first);
byte[] local = baos.toByteArray();
ZooKeeperExt zk = mock(ZooKeeperExt.class);
- when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList);
+ when(zk.getChildrenExt(testListName, false, false, true)).
+ thenReturn(testList);
when(zk.getData("remote1", false, null)).thenReturn(remote1);
when(zk.getData("remote2", false, null)).thenReturn(remote2);
when(zk.getData("local", false, null)).thenReturn(local);
InputSplitPathOrganizer lis =
- new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0);
+ new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0, true);
final List<String> resultList = new ArrayList<String>();
for (String next : lis) {
resultList.add(next);
}
- assertEquals(goodList, resultList);
+ assertEquals("local", resultList.get(0));
}
/**