You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2012/10/29 16:47:30 UTC

svn commit: r1403386 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/graph/

Author: ereisman
Date: Mon Oct 29 15:47:30 2012
New Revision: 1403386

URL: http://svn.apache.org/viewvc?rev=1403386&view=rev
Log:
GIRAPH-389: Multithreading should intelligently allocate the thread pools (aching via ereisman)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 29 15:47:30 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-389: Multithreading should intelligently allocate the thread pools. (aching via ereisman)
+
   GIRAPH-273: Aggregators shouldn't use Zookeeper (majakabiljo)
 
   GIRAPH-376: Facebook Hadoop profile broken. (nitay via aching)

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Mon Oct 29 15:47:30 2012
@@ -190,94 +190,6 @@ public class BspServiceWorker<I extends 
   }
 
   /**
-   * Try to reserve an InputSplit for loading.  While InputSplits exists that
-   * are not finished, wait until they are.
-   *
-   * NOTE: iterations on the InputSplit list only halt for each worker when it
-   * has scanned the entire list once and found every split marked RESERVED.
-   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
-   * allowing other iterating workers to claim it's previously read splits.
-   * Only when the last worker left iterating on the list fails can a danger
-   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
-   * causes job failure, this is OK. As the failure model evolves, this
-   * behavior might need to change.
-   *
-   * @return reserved InputSplit or null if no unfinished InputSplits exist
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  private String reserveInputSplit()
-    throws KeeperException, InterruptedException {
-    if (null == splitOrganizer) {
-      splitOrganizer = new InputSplitPathOrganizer(getZkExt(),
-        inputSplitsPath, getHostname(), getWorkerInfo().getPort());
-    }
-    String reservedInputSplitPath = null;
-    Stat reservedStat = null;
-    final Mapper<?, ?, ?, ?>.Context context = getContext();
-    while (true) {
-      int reservedInputSplits = 0;
-      for (String nextSplitToClaim : splitOrganizer) {
-        context.progress();
-        String tmpInputSplitReservedPath =
-            nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
-        reservedStat =
-            getZkExt().exists(tmpInputSplitReservedPath, true);
-        if (reservedStat == null) {
-          try {
-            // Attempt to reserve this InputSplit
-            getZkExt().createExt(tmpInputSplitReservedPath,
-                null,
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL,
-                false);
-            reservedInputSplitPath = nextSplitToClaim;
-            if (LOG.isInfoEnabled()) {
-              float percentFinished =
-                  reservedInputSplits * 100.0f /
-                  splitOrganizer.getPathListSize();
-              LOG.info("reserveInputSplit: Reserved input " +
-                  "split path " + reservedInputSplitPath +
-                  ", overall roughly " +
-                  + percentFinished +
-                  "% input splits reserved");
-            }
-            return reservedInputSplitPath;
-          } catch (KeeperException.NodeExistsException e) {
-            LOG.info("reserveInputSplit: Couldn't reserve " +
-                "(already reserved) inputSplit" +
-                " at " + tmpInputSplitReservedPath);
-          } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "reserveInputSplit: KeeperException on reserve", e);
-          } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "reserveInputSplit: InterruptedException " +
-                    "on reserve", e);
-          }
-        } else {
-          ++reservedInputSplits;
-        }
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("reserveInputSplit: reservedPath = " +
-            reservedInputSplitPath + ", " + reservedInputSplits +
-            " of " + splitOrganizer.getPathListSize() +
-            " InputSplits are finished.");
-      }
-      if (reservedInputSplits == splitOrganizer.getPathListSize()) {
-        return null;
-      }
-      getContext().progress();
-      // Wait for either a reservation to go away or a notification that
-      // an InputSplit has finished.
-      context.progress();
-      getInputSplitsStateChangedEvent().waitMsecs(60 * 1000);
-      getInputSplitsStateChangedEvent().reset();
-    }
-  }
-
-  /**
    * Load the vertices from the user-defined VertexReader into our partitions
    * of vertex ranges.  Do this until all the InputSplits have been processed.
    * All workers will try to do as many InputSplits as they can.  The master
@@ -289,29 +201,34 @@ public class BspServiceWorker<I extends 
    * Use one or more threads to do the loading.
    *
    * @return Statistics of the vertices loaded
-   * @throws IOException
-   * @throws IllegalAccessException
-   * @throws InstantiationException
-   * @throws ClassNotFoundException
    * @throws InterruptedException
    * @throws KeeperException
    */
-  private VertexEdgeCount loadVertices() throws IOException,
-    ClassNotFoundException, InterruptedException, InstantiationException,
-    IllegalAccessException, KeeperException {
+  private VertexEdgeCount loadVertices()
+    throws InterruptedException, KeeperException {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
 
+    // Get the number of splits first to determine how many threads to use
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
+
     GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
         INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
         null);
-    int numThreads = getConfiguration().getNumInputSplitsThreads();
+    int maxInputSplitThreads =
+        inputSplitPathList.size() / getConfiguration().getMaxWorkers();
+    int numThreads =
+        Math.min(getConfiguration().getNumInputSplitsThreads(),
+            maxInputSplitThreads);
     ExecutorService inputSplitsExecutor =
         Executors.newFixedThreadPool(numThreads,
             new ThreadFactoryBuilder().setNameFormat("load-%d").build());
     List<Future<VertexEdgeCount>> threadsFutures =
         Lists.newArrayListWithCapacity(numThreads);
     if (LOG.isInfoEnabled()) {
-      LOG.info("loadVertices: Using " + numThreads + " threads.");
+      LOG.info("loadVertices: Using " + numThreads + " thread(s), " +
+          "originally " + getConfiguration().getNumInputSplitsThreads() +
+          " threads(s) for " + inputSplitPathList.size() + " total splits.");
     }
     for (int i = 0; i < numThreads; ++i) {
       Callable<VertexEdgeCount> inputSplitsCallable =
@@ -320,7 +237,7 @@ public class BspServiceWorker<I extends 
               graphState,
               getConfiguration(),
               this,
-              inputSplitsPath,
+              inputSplitPathList,
               getWorkerInfo(),
               getZkExt());
       threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
@@ -422,30 +339,20 @@ else[HADOOP_NON_SECURE]*/
 
     getContext().progress();
 
+    // Load all the vertices and edges and get some stats about what was loaded
+    VertexEdgeCount vertexEdgeCount = null;
     try {
-      VertexEdgeCount vertexEdgeCount = loadVertices();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("setup: Finally loaded a total of " +
-            vertexEdgeCount);
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException("setup: loadVertices failed due to " +
-          "IOException", e);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException("setup: loadVertices failed due to " +
-          "ClassNotFoundException", e);
+      vertexEdgeCount = loadVertices();
     } catch (InterruptedException e) {
-      throw new IllegalStateException("setup: loadVertices failed due to " +
-          "InterruptedException", e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("setup: loadVertices failed due to " +
-          "InstantiationException", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("setup: loadVertices failed due to " +
-          "IllegalAccessException", e);
+      throw new IllegalStateException(
+          "setup: loadVertices failed with InterruptedException", e);
     } catch (KeeperException e) {
-      throw new IllegalStateException("setup: loadVertices failed due to " +
-          "KeeperException", e);
+      throw new IllegalStateException(
+          "setup: loadVertices failed with KeeperException", e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("setup: Finally loaded a total of " +
+          vertexEdgeCount);
     }
     getContext().progress();
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Mon Oct 29 15:47:30 2012
@@ -451,7 +451,7 @@ public class GraphMapper<I extends Writa
     List<PartitionStats> partitionStatsList =
         new ArrayList<PartitionStats>();
 
-    int numThreads = conf.getNumComputeThreads();
+    int numComputeThreads = conf.getNumComputeThreads();
     FinishedSuperstepStats finishedSuperstepStats = null;
     do {
       final long superstep = serviceWorker.getSuperstep();
@@ -496,14 +496,15 @@ public class GraphMapper<I extends Writa
 
       MessageStoreByPartition<I, M> messageStore =
           serviceWorker.getServerData().getCurrentMessageStore();
-      partitionStatsList.clear();
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
+      int numThreads =
+          Math.min(numComputeThreads, numPartitions);
       if (LOG.isInfoEnabled()) {
-        LOG.info("map: " + numPartitions +
-            " partitions to process in superstep " + superstep + " with " +
-            numThreads + " thread(s)");
+        LOG.info("map: " + numPartitions + " partitions to process with " +
+            numThreads + " compute thread(s), originally " +
+            numComputeThreads + " thread(s) on superstep " + superstep);
       }
-
+      partitionStatsList.clear();
       if (numPartitions > 0) {
         List<Future<Collection<PartitionStats>>> partitionFutures =
             Lists.newArrayListWithCapacity(numPartitions);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java Mon Oct 29 15:47:30 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.graph;
 
+import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -51,6 +52,7 @@ public class InputSplitPathOrganizer imp
 
   /**
    * Constructor
+   *
    * @param zooKeeper the worker's ZkExt
    * @param zkPathList the path to read from
    * @param hostName the worker's host name (for matching)
@@ -59,8 +61,24 @@ public class InputSplitPathOrganizer imp
   public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
     final String zkPathList, final String hostName, final int port)
     throws KeeperException, InterruptedException {
+    this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
+        hostName, port);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param zooKeeper the worker's ZkExt
+   * @param inputSplitPathList path of input splits to read from
+   * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
+   */
+  public InputSplitPathOrganizer(
+      final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
+      final String hostName, final int port)
+    throws KeeperException, InterruptedException {
     this.zooKeeper = zooKeeper;
-    this.pathList = zooKeeper.getChildrenExt(zkPathList, false, false, true);
+    this.pathList = Lists.newArrayList(inputSplitPathList);
     this.hostName = hostName;
     this.baseOffset = 0; // set later after switching out local paths
     prioritizeLocalInputSplits(port);
@@ -72,6 +90,7 @@ public class InputSplitPathOrganizer imp
   * a split to read. This will increase locality of data reads with greater
   * probability as the % of total nodes in the cluster hosting data and workers
   * BOTH increase towards 100%. Replication increases our chances of a "hit."
+  *
   * @param port the port number for hashing unique iteration indexes for all
   *             workers, even those sharing the same host node.
   */
@@ -125,6 +144,7 @@ public class InputSplitPathOrganizer imp
 
   /**
    * Utility accessor for Input Split znode path list size
+   *
    * @return the size of <code>this.pathList</code>
    */
   public int getPathListSize() {
@@ -133,6 +153,7 @@ public class InputSplitPathOrganizer imp
 
   /**
    * Iterator for the pathList
+   *
    * @return an iterator for our list of input split paths
    */
   public Iterator<String> iterator() {
@@ -148,6 +169,7 @@ public class InputSplitPathOrganizer imp
 
     /**
      *  Do we have more list to iterate upon?
+     *
      *  @return true if more path strings are available
      */
     @Override
@@ -155,7 +177,9 @@ public class InputSplitPathOrganizer imp
       return currentIndex < pathList.size();
     }
 
-    /** return the next pathList element
+    /**
+     * Return the next pathList element
+     *
      * @return the next input split path
      */
     @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1403386&r1=1403385&r2=1403386&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Mon Oct 29 15:47:30 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.Callable;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
@@ -73,8 +74,6 @@ public class InputSplitsCallable<I exten
    * in a tree of child znodes by the master.
    */
   private final InputSplitPathOrganizer splitOrganizer;
-  /** Location of input splits */
-  private final String inputSplitsPath;
   /** ZooKeeperExt handle */
   private final ZooKeeperExt zooKeeperExt;
   /** Configuration */
@@ -98,7 +97,7 @@ public class InputSplitsCallable<I exten
    * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
-   * @param inputSplitsPath Path of the input splits
+   * @param inputSplitPathList List of the paths of the input splits
    * @param workerInfo This worker's info
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
@@ -106,7 +105,7 @@ public class InputSplitsCallable<I exten
       Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
-      String inputSplitsPath,
+      List<String> inputSplitPathList,
       WorkerInfo workerInfo,
       ZooKeeperExt zooKeeperExt)  {
     this.zooKeeperExt = zooKeeperExt;
@@ -117,10 +116,9 @@ public class InputSplitsCallable<I exten
     this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
         graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
         context, graphState.getGraphMapper(), workerClientRequestProcessor);
-    this.inputSplitsPath = inputSplitsPath;
     try {
       splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
-          inputSplitsPath, workerInfo.getHostname(), workerInfo.getPort());
+          inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "InputSplitsCallable: KeeperException", e);