You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:59:00 UTC
git commit: updated refs/heads/trunk to 843d186
Repository: giraph
Updated Branches:
refs/heads/trunk 5b0cd0e0a -> 843d1863d
GIRAPH-1035: Make sure we are able to use all compute threads
Summary: The default logic of choosing the number of partitions when we use few workers and a lot of compute threads ends up choosing less partitions than there are threads. Add additional setting to prevent that.
Test Plan: Run a job with a few workers and lot of threads and verified number of partitions is set properly. mvn verify passed.
Differential Revision: https://reviews.facebook.net/D48993
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/843d1863
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/843d1863
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/843d1863
Branch: refs/heads/trunk
Commit: 843d1863d3624eeda4f094beefdd1eaf6eb23daf
Parents: 5b0cd0e
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Oct 19 10:35:54 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 19 10:58:36 2015 -0700
----------------------------------------------------------------------
.../org/apache/giraph/conf/GiraphConstants.java | 5 +++
.../PseudoRandomIntNullLocalEdgesHelper.java | 4 +-
.../formats/PseudoRandomLocalEdgesHelper.java | 4 +-
.../giraph/partition/HashMasterPartitioner.java | 2 +-
.../apache/giraph/partition/PartitionUtils.java | 46 ++++++++++++--------
.../partition/SimpleMasterPartitioner.java | 2 +-
6 files changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 2804192..5a0328b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -862,6 +862,11 @@ public interface GiraphConstants {
new FloatConfOption("giraph.masterPartitionCountMultiplier", 1.0f,
"Multiplier for the current workers squared");
+ /** Minimum number of partitions to have per compute thread */
+ IntConfOption MIN_PARTITIONS_PER_COMPUTE_THREAD =
+ new IntConfOption("giraph.minPartitionsPerComputeThread", 1,
+ "Minimum number of partitions to have per compute thread");
+
/** Overrides default partition count calculation if not -1 */
IntConfOption USER_PARTITION_COUNT =
new IntConfOption("giraph.userPartitionCount", -1,
http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
index 46997a8..ab5bfb3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
@@ -56,8 +56,8 @@ public class PseudoRandomIntNullLocalEdgesHelper {
int numWorkers = conf.getMaxWorkers();
List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers,
new WorkerInfo());
- numPartitions = PartitionUtils.computePartitionCount(workerInfos,
- numWorkers, conf);
+ numPartitions = PartitionUtils.computePartitionCount(
+ workerInfos.size(), conf);
partitionSize = numVertices / numPartitions;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
index 84502e1..1b421b7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
@@ -61,8 +61,8 @@ public class PseudoRandomLocalEdgesHelper {
int numWorkers = conf.getMaxWorkers();
List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers,
new WorkerInfo());
- numPartitions = PartitionUtils.computePartitionCount(workerInfos,
- numWorkers, conf);
+ numPartitions =
+ PartitionUtils.computePartitionCount(workerInfos.size(), conf);
partitionSize = numVertices / numPartitions;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index caede8c..607347d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -61,7 +61,7 @@ public class HashMasterPartitioner<I extends WritableComparable,
public Collection<PartitionOwner> createInitialPartitionOwners(
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
int partitionCount = PartitionUtils.computePartitionCount(
- availableWorkerInfos, maxWorkers, conf);
+ availableWorkerInfos.size(), conf);
List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
for (int i = 0; i < partitionCount; ++i) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index 6914c3b..e4305ff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -36,6 +36,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.giraph.conf.GiraphConstants
+ .MIN_PARTITIONS_PER_COMPUTE_THREAD;
+import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
/**
@@ -171,15 +174,18 @@ public class PartitionUtils {
/**
* Compute the number of partitions, based on the configuration.
*
- * @param availableWorkerInfos Available workers.
- * @param maxWorkers Maximum number of workers.
+ * If USER_PARTITION_COUNT is set, it will follow that, otherwise it will
+ * choose the max of what MIN_PARTITIONS_PER_COMPUTE_THREAD and
+ * PARTITION_COUNT_MULTIPLIER settings would choose, capped by max
+ * partitions limited constrained by zookeeper.
+ *
+ * @param availableWorkerCount Number of available workers
* @param conf Configuration.
* @return Number of partitions for the job.
*/
- public static int computePartitionCount(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers,
+ public static int computePartitionCount(int availableWorkerCount,
ImmutableClassesGiraphConfiguration conf) {
- if (availableWorkerInfos.isEmpty()) {
+ if (availableWorkerCount == 0) {
throw new IllegalArgumentException(
"computePartitionCount: No available workers");
}
@@ -188,32 +194,36 @@ public class PartitionUtils {
int partitionCount;
if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
- partitionCount =
- Math.max((int) (multiplier * availableWorkerInfos.size() *
- availableWorkerInfos.size()),
- 1);
+ partitionCount = Math.max(
+ (int) (multiplier * availableWorkerCount * availableWorkerCount), 1);
+ int minPartitionsPerComputeThread =
+ MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf);
+ int totalComputeThreads =
+ NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount;
+ partitionCount = Math.max(partitionCount,
+ minPartitionsPerComputeThread * totalComputeThreads);
} else {
partitionCount = userPartitionCount;
}
if (LOG.isInfoEnabled()) {
LOG.info("computePartitionCount: Creating " +
- partitionCount + ", default would have been " +
- (availableWorkerInfos.size() *
- availableWorkerInfos.size()) + " partitions.");
+ partitionCount + " partitions.");
}
int maxPartitions = getMaxPartitions(conf);
if (partitionCount > maxPartitions) {
// try to keep partitionCount divisible by number of workers
// in order to keep the balance
- int reducedPartitions = (maxPartitions / availableWorkerInfos.size()) *
- availableWorkerInfos.size();
+ int reducedPartitions = (maxPartitions / availableWorkerCount) *
+ availableWorkerCount;
if (reducedPartitions == 0) {
reducedPartitions = maxPartitions;
}
- LOG.warn("computePartitionCount: " +
- "Reducing the partitionCount to " + reducedPartitions +
- " from " + partitionCount + " because of " + maxPartitions +
- " limit");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("computePartitionCount: " +
+ "Reducing the partitionCount to " + reducedPartitions +
+ " from " + partitionCount + " because of " + maxPartitions +
+ " limit");
+ }
partitionCount = reducedPartitions;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
index 7d4c1cb..638dacf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
@@ -61,7 +61,7 @@ public abstract class SimpleMasterPartitioner<I extends WritableComparable,
public Collection<PartitionOwner> createInitialPartitionOwners(
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
int partitionCount = PartitionUtils.computePartitionCount(
- availableWorkerInfos, maxWorkers, conf);
+ availableWorkerInfos.size(), conf);
ArrayList<WorkerInfo> workerList =
new ArrayList<WorkerInfo>(availableWorkerInfos);