You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:59:00 UTC

git commit: updated refs/heads/trunk to 843d186

Repository: giraph
Updated Branches:
  refs/heads/trunk 5b0cd0e0a -> 843d1863d


GIRAPH-1035: Make sure we are able to use all compute threads

Summary: The default logic of choosing the number of partitions when we use few workers and a lot of compute threads ends up choosing less partitions than there are threads. Add additional setting to prevent that.

Test Plan: Run a job with a few workers and lot of threads and verified number of partitions is set properly. mvn verify passed.

Differential Revision: https://reviews.facebook.net/D48993


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/843d1863
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/843d1863
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/843d1863

Branch: refs/heads/trunk
Commit: 843d1863d3624eeda4f094beefdd1eaf6eb23daf
Parents: 5b0cd0e
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Oct 19 10:35:54 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 19 10:58:36 2015 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/conf/GiraphConstants.java |  5 +++
 .../PseudoRandomIntNullLocalEdgesHelper.java    |  4 +-
 .../formats/PseudoRandomLocalEdgesHelper.java   |  4 +-
 .../giraph/partition/HashMasterPartitioner.java |  2 +-
 .../apache/giraph/partition/PartitionUtils.java | 46 ++++++++++++--------
 .../partition/SimpleMasterPartitioner.java      |  2 +-
 6 files changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 2804192..5a0328b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -862,6 +862,11 @@ public interface GiraphConstants {
       new FloatConfOption("giraph.masterPartitionCountMultiplier", 1.0f,
           "Multiplier for the current workers squared");
 
+  /** Minimum number of partitions to have per compute thread */
+  IntConfOption MIN_PARTITIONS_PER_COMPUTE_THREAD =
+      new IntConfOption("giraph.minPartitionsPerComputeThread", 1,
+          "Minimum number of partitions to have per compute thread");
+
   /** Overrides default partition count calculation if not -1 */
   IntConfOption USER_PARTITION_COUNT =
       new IntConfOption("giraph.userPartitionCount", -1,

http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
index 46997a8..ab5bfb3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
@@ -56,8 +56,8 @@ public class PseudoRandomIntNullLocalEdgesHelper {
     int numWorkers = conf.getMaxWorkers();
     List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers,
         new WorkerInfo());
-    numPartitions = PartitionUtils.computePartitionCount(workerInfos,
-        numWorkers, conf);
+    numPartitions = PartitionUtils.computePartitionCount(
+        workerInfos.size(), conf);
     partitionSize = numVertices / numPartitions;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
index 84502e1..1b421b7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
@@ -61,8 +61,8 @@ public class PseudoRandomLocalEdgesHelper {
     int numWorkers = conf.getMaxWorkers();
     List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers,
         new WorkerInfo());
-    numPartitions = PartitionUtils.computePartitionCount(workerInfos,
-        numWorkers, conf);
+    numPartitions =
+        PartitionUtils.computePartitionCount(workerInfos.size(), conf);
     partitionSize = numVertices / numPartitions;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index caede8c..607347d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -61,7 +61,7 @@ public class HashMasterPartitioner<I extends WritableComparable,
   public Collection<PartitionOwner> createInitialPartitionOwners(
       Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
     int partitionCount = PartitionUtils.computePartitionCount(
-        availableWorkerInfos, maxWorkers, conf);
+        availableWorkerInfos.size(), conf);
     List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
     Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
     for (int i = 0; i < partitionCount; ++i) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index 6914c3b..e4305ff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -36,6 +36,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static org.apache.giraph.conf.GiraphConstants
+    .MIN_PARTITIONS_PER_COMPUTE_THREAD;
+import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
 import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
 
 /**
@@ -171,15 +174,18 @@ public class PartitionUtils {
   /**
    * Compute the number of partitions, based on the configuration.
    *
-   * @param availableWorkerInfos Available workers.
-   * @param maxWorkers Maximum number of workers.
+   * If USER_PARTITION_COUNT is set, it will follow that, otherwise it will
+   * choose the max of what MIN_PARTITIONS_PER_COMPUTE_THREAD and
+   * PARTITION_COUNT_MULTIPLIER settings would choose, capped by max
+   * partitions limited constrained by zookeeper.
+   *
+   * @param availableWorkerCount Number of available workers
    * @param conf Configuration.
    * @return Number of partitions for the job.
    */
-  public static int computePartitionCount(
-      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers,
+  public static int computePartitionCount(int availableWorkerCount,
       ImmutableClassesGiraphConfiguration conf) {
-    if (availableWorkerInfos.isEmpty()) {
+    if (availableWorkerCount == 0) {
       throw new IllegalArgumentException(
           "computePartitionCount: No available workers");
     }
@@ -188,32 +194,36 @@ public class PartitionUtils {
     int partitionCount;
     if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
       float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
-      partitionCount =
-          Math.max((int) (multiplier * availableWorkerInfos.size() *
-              availableWorkerInfos.size()),
-              1);
+      partitionCount = Math.max(
+          (int) (multiplier * availableWorkerCount * availableWorkerCount), 1);
+      int minPartitionsPerComputeThread =
+          MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf);
+      int totalComputeThreads =
+          NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount;
+      partitionCount = Math.max(partitionCount,
+          minPartitionsPerComputeThread * totalComputeThreads);
     } else {
       partitionCount = userPartitionCount;
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("computePartitionCount: Creating " +
-          partitionCount + ", default would have been " +
-          (availableWorkerInfos.size() *
-              availableWorkerInfos.size()) + " partitions.");
+          partitionCount + " partitions.");
     }
     int maxPartitions = getMaxPartitions(conf);
     if (partitionCount > maxPartitions) {
       // try to keep partitionCount divisible by number of workers
       // in order to keep the balance
-      int reducedPartitions = (maxPartitions / availableWorkerInfos.size()) *
-          availableWorkerInfos.size();
+      int reducedPartitions = (maxPartitions / availableWorkerCount) *
+          availableWorkerCount;
       if (reducedPartitions == 0) {
         reducedPartitions = maxPartitions;
       }
-      LOG.warn("computePartitionCount: " +
-          "Reducing the partitionCount to " + reducedPartitions +
-          " from " + partitionCount + " because of " + maxPartitions +
-          " limit");
+      if (LOG.isInfoEnabled()) {
+        LOG.info("computePartitionCount: " +
+            "Reducing the partitionCount to " + reducedPartitions +
+            " from " + partitionCount + " because of " + maxPartitions +
+            " limit");
+      }
       partitionCount = reducedPartitions;
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
index 7d4c1cb..638dacf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
@@ -61,7 +61,7 @@ public abstract class SimpleMasterPartitioner<I extends WritableComparable,
   public Collection<PartitionOwner> createInitialPartitionOwners(
       Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
     int partitionCount = PartitionUtils.computePartitionCount(
-        availableWorkerInfos, maxWorkers, conf);
+        availableWorkerInfos.size(), conf);
     ArrayList<WorkerInfo> workerList =
         new ArrayList<WorkerInfo>(availableWorkerInfos);