You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2012/10/29 16:47:30 UTC
svn commit: r1403386 - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/graph/
Author: ereisman
Date: Mon Oct 29 15:47:30 2012
New Revision: 1403386
URL: http://svn.apache.org/viewvc?rev=1403386&view=rev
Log:
GIRAPH-389: Multithreading should intelligently allocate the thread pools (aching via ereisman)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 29 15:47:30 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-389: Multithreading should intelligently allocate the thread pools. (aching via ereisman)
+
GIRAPH-273: Aggregators shouldn't use Zookeeper (majakabiljo)
GIRAPH-376: Facebook Hadoop profile broken. (nitay via aching)
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Mon Oct 29 15:47:30 2012
@@ -190,94 +190,6 @@ public class BspServiceWorker<I extends
}
/**
- * Try to reserve an InputSplit for loading. While InputSplits exists that
- * are not finished, wait until they are.
- *
- * NOTE: iterations on the InputSplit list only halt for each worker when it
- * has scanned the entire list once and found every split marked RESERVED.
- * When a worker fails, its Ephemeral RESERVED znodes will disappear,
- * allowing other iterating workers to claim it's previously read splits.
- * Only when the last worker left iterating on the list fails can a danger
- * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
- * causes job failure, this is OK. As the failure model evolves, this
- * behavior might need to change.
- *
- * @return reserved InputSplit or null if no unfinished InputSplits exist
- * @throws KeeperException
- * @throws InterruptedException
- */
- private String reserveInputSplit()
- throws KeeperException, InterruptedException {
- if (null == splitOrganizer) {
- splitOrganizer = new InputSplitPathOrganizer(getZkExt(),
- inputSplitsPath, getHostname(), getWorkerInfo().getPort());
- }
- String reservedInputSplitPath = null;
- Stat reservedStat = null;
- final Mapper<?, ?, ?, ?>.Context context = getContext();
- while (true) {
- int reservedInputSplits = 0;
- for (String nextSplitToClaim : splitOrganizer) {
- context.progress();
- String tmpInputSplitReservedPath =
- nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
- reservedStat =
- getZkExt().exists(tmpInputSplitReservedPath, true);
- if (reservedStat == null) {
- try {
- // Attempt to reserve this InputSplit
- getZkExt().createExt(tmpInputSplitReservedPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL,
- false);
- reservedInputSplitPath = nextSplitToClaim;
- if (LOG.isInfoEnabled()) {
- float percentFinished =
- reservedInputSplits * 100.0f /
- splitOrganizer.getPathListSize();
- LOG.info("reserveInputSplit: Reserved input " +
- "split path " + reservedInputSplitPath +
- ", overall roughly " +
- + percentFinished +
- "% input splits reserved");
- }
- return reservedInputSplitPath;
- } catch (KeeperException.NodeExistsException e) {
- LOG.info("reserveInputSplit: Couldn't reserve " +
- "(already reserved) inputSplit" +
- " at " + tmpInputSplitReservedPath);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "reserveInputSplit: KeeperException on reserve", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "reserveInputSplit: InterruptedException " +
- "on reserve", e);
- }
- } else {
- ++reservedInputSplits;
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("reserveInputSplit: reservedPath = " +
- reservedInputSplitPath + ", " + reservedInputSplits +
- " of " + splitOrganizer.getPathListSize() +
- " InputSplits are finished.");
- }
- if (reservedInputSplits == splitOrganizer.getPathListSize()) {
- return null;
- }
- getContext().progress();
- // Wait for either a reservation to go away or a notification that
- // an InputSplit has finished.
- context.progress();
- getInputSplitsStateChangedEvent().waitMsecs(60 * 1000);
- getInputSplitsStateChangedEvent().reset();
- }
- }
-
- /**
* Load the vertices from the user-defined VertexReader into our partitions
* of vertex ranges. Do this until all the InputSplits have been processed.
* All workers will try to do as many InputSplits as they can. The master
@@ -289,29 +201,34 @@ public class BspServiceWorker<I extends
* Use one or more threads to do the loading.
*
* @return Statistics of the vertices loaded
- * @throws IOException
- * @throws IllegalAccessException
- * @throws InstantiationException
- * @throws ClassNotFoundException
* @throws InterruptedException
* @throws KeeperException
*/
- private VertexEdgeCount loadVertices() throws IOException,
- ClassNotFoundException, InterruptedException, InstantiationException,
- IllegalAccessException, KeeperException {
+ private VertexEdgeCount loadVertices()
+ throws InterruptedException, KeeperException {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+ // Get the number of splits first to determine how many threads to use
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
+
GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
null);
- int numThreads = getConfiguration().getNumInputSplitsThreads();
+ int maxInputSplitThreads =
+ inputSplitPathList.size() / getConfiguration().getMaxWorkers();
+ int numThreads =
+ Math.min(getConfiguration().getNumInputSplitsThreads(),
+ maxInputSplitThreads);
ExecutorService inputSplitsExecutor =
Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("load-%d").build());
List<Future<VertexEdgeCount>> threadsFutures =
Lists.newArrayListWithCapacity(numThreads);
if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Using " + numThreads + " threads.");
+ LOG.info("loadVertices: Using " + numThreads + " thread(s), " +
+ "originally " + getConfiguration().getNumInputSplitsThreads() +
+ " threads(s) for " + inputSplitPathList.size() + " total splits.");
}
for (int i = 0; i < numThreads; ++i) {
Callable<VertexEdgeCount> inputSplitsCallable =
@@ -320,7 +237,7 @@ public class BspServiceWorker<I extends
graphState,
getConfiguration(),
this,
- inputSplitsPath,
+ inputSplitPathList,
getWorkerInfo(),
getZkExt());
threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
@@ -422,30 +339,20 @@ else[HADOOP_NON_SECURE]*/
getContext().progress();
+ // Load all the vertices and edges and get some stats about what was loaded
+ VertexEdgeCount vertexEdgeCount = null;
try {
- VertexEdgeCount vertexEdgeCount = loadVertices();
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Finally loaded a total of " +
- vertexEdgeCount);
- }
- } catch (IOException e) {
- throw new IllegalStateException("setup: loadVertices failed due to " +
- "IOException", e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("setup: loadVertices failed due to " +
- "ClassNotFoundException", e);
+ vertexEdgeCount = loadVertices();
} catch (InterruptedException e) {
- throw new IllegalStateException("setup: loadVertices failed due to " +
- "InterruptedException", e);
- } catch (InstantiationException e) {
- throw new IllegalStateException("setup: loadVertices failed due to " +
- "InstantiationException", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("setup: loadVertices failed due to " +
- "IllegalAccessException", e);
+ throw new IllegalStateException(
+ "setup: loadVertices failed with InterruptedException", e);
} catch (KeeperException e) {
- throw new IllegalStateException("setup: loadVertices failed due to " +
- "KeeperException", e);
+ throw new IllegalStateException(
+ "setup: loadVertices failed with KeeperException", e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Finally loaded a total of " +
+ vertexEdgeCount);
}
getContext().progress();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Mon Oct 29 15:47:30 2012
@@ -451,7 +451,7 @@ public class GraphMapper<I extends Writa
List<PartitionStats> partitionStatsList =
new ArrayList<PartitionStats>();
- int numThreads = conf.getNumComputeThreads();
+ int numComputeThreads = conf.getNumComputeThreads();
FinishedSuperstepStats finishedSuperstepStats = null;
do {
final long superstep = serviceWorker.getSuperstep();
@@ -496,14 +496,15 @@ public class GraphMapper<I extends Writa
MessageStoreByPartition<I, M> messageStore =
serviceWorker.getServerData().getCurrentMessageStore();
- partitionStatsList.clear();
int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
+ int numThreads =
+ Math.min(numComputeThreads, numPartitions);
if (LOG.isInfoEnabled()) {
- LOG.info("map: " + numPartitions +
- " partitions to process in superstep " + superstep + " with " +
- numThreads + " thread(s)");
+ LOG.info("map: " + numPartitions + " partitions to process with " +
+ numThreads + " compute thread(s), originally " +
+ numComputeThreads + " thread(s) on superstep " + superstep);
}
-
+ partitionStatsList.clear();
if (numPartitions > 0) {
List<Future<Collection<PartitionStats>>> partitionFutures =
Lists.newArrayListWithCapacity(numPartitions);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java Mon Oct 29 15:47:30 2012
@@ -17,6 +17,7 @@
*/
package org.apache.giraph.graph;
+import com.google.common.collect.Lists;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
@@ -51,6 +52,7 @@ public class InputSplitPathOrganizer imp
/**
* Constructor
+ *
* @param zooKeeper the worker's ZkExt
* @param zkPathList the path to read from
* @param hostName the worker's host name (for matching)
@@ -59,8 +61,24 @@ public class InputSplitPathOrganizer imp
public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
final String zkPathList, final String hostName, final int port)
throws KeeperException, InterruptedException {
+ this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
+ hostName, port);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param zooKeeper the worker's ZkExt
+ * @param inputSplitPathList path of input splits to read from
+ * @param hostName the worker's host name (for matching)
+ * @param port the port number for this worker
+ */
+ public InputSplitPathOrganizer(
+ final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
+ final String hostName, final int port)
+ throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
- this.pathList = zooKeeper.getChildrenExt(zkPathList, false, false, true);
+ this.pathList = Lists.newArrayList(inputSplitPathList);
this.hostName = hostName;
this.baseOffset = 0; // set later after switching out local paths
prioritizeLocalInputSplits(port);
@@ -72,6 +90,7 @@ public class InputSplitPathOrganizer imp
* a split to read. This will increase locality of data reads with greater
* probability as the % of total nodes in the cluster hosting data and workers
* BOTH increase towards 100%. Replication increases our chances of a "hit."
+ *
* @param port the port number for hashing unique iteration indexes for all
* workers, even those sharing the same host node.
*/
@@ -125,6 +144,7 @@ public class InputSplitPathOrganizer imp
/**
* Utility accessor for Input Split znode path list size
+ *
* @return the size of <code>this.pathList</code>
*/
public int getPathListSize() {
@@ -133,6 +153,7 @@ public class InputSplitPathOrganizer imp
/**
* Iterator for the pathList
+ *
* @return an iterator for our list of input split paths
*/
public Iterator<String> iterator() {
@@ -148,6 +169,7 @@ public class InputSplitPathOrganizer imp
/**
* Do we have more list to iterate upon?
+ *
* @return true if more path strings are available
*/
@Override
@@ -155,7 +177,9 @@ public class InputSplitPathOrganizer imp
return currentIndex < pathList.size();
}
- /** return the next pathList element
+ /**
+ * Return the next pathList element
+ *
* @return the next input split path
*/
@Override
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Mon Oct 29 15:47:30 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.Callable;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
@@ -73,8 +74,6 @@ public class InputSplitsCallable<I exten
* in a tree of child znodes by the master.
*/
private final InputSplitPathOrganizer splitOrganizer;
- /** Location of input splits */
- private final String inputSplitsPath;
/** ZooKeeperExt handle */
private final ZooKeeperExt zooKeeperExt;
/** Configuration */
@@ -98,7 +97,7 @@ public class InputSplitsCallable<I exten
* @param graphState Graph state
* @param configuration Configuration
* @param bspServiceWorker service worker
- * @param inputSplitsPath Path of the input splits
+ * @param inputSplitPathList List of the paths of the input splits
* @param workerInfo This worker's info
* @param zooKeeperExt Handle to ZooKeeperExt
*/
@@ -106,7 +105,7 @@ public class InputSplitsCallable<I exten
Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
BspServiceWorker<I, V, E, M> bspServiceWorker,
- String inputSplitsPath,
+ List<String> inputSplitPathList,
WorkerInfo workerInfo,
ZooKeeperExt zooKeeperExt) {
this.zooKeeperExt = zooKeeperExt;
@@ -117,10 +116,9 @@ public class InputSplitsCallable<I exten
this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
context, graphState.getGraphMapper(), workerClientRequestProcessor);
- this.inputSplitsPath = inputSplitsPath;
try {
splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
- inputSplitsPath, workerInfo.getHostname(), workerInfo.getPort());
+ inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
} catch (KeeperException e) {
throw new IllegalStateException(
"InputSplitsCallable: KeeperException", e);