You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/02/25 21:06:47 UTC
git commit: GIRAPH-535: Range-partitioning and edge locality
benchmark (apresta)
Updated Branches:
refs/heads/trunk 71eab655e -> 507959dcb
GIRAPH-535: Range-partitioning and edge locality benchmark (apresta)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/507959dc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/507959dc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/507959dc
Branch: refs/heads/trunk
Commit: 507959dcb4af9b5c5f26a97c237cdb7b6235e7a1
Parents: 71eab65
Author: Alessandro Presta <al...@fb.com>
Authored: Mon Feb 25 11:57:41 2013 -0800
Committer: Alessandro Presta <al...@fb.com>
Committed: Mon Feb 25 12:06:27 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 4 +
.../giraph/benchmark/AggregatorsBenchmark.java | 5 +-
.../apache/giraph/benchmark/PageRankBenchmark.java | 52 ++++++---
.../giraph/benchmark/RandomMessageBenchmark.java | 5 +-
.../giraph/benchmark/ShortestPathsBenchmark.java | 5 +-
.../org/apache/giraph/conf/GiraphConstants.java | 17 +++
.../io/formats/PseudoRandomEdgeInputFormat.java | 29 +++---
.../io/formats/PseudoRandomVertexInputFormat.java | 34 +++---
.../giraph/partition/HashMasterPartitioner.java | 80 ++------------
.../giraph/partition/HashWorkerPartitioner.java | 44 +-------
.../apache/giraph/partition/PartitionBalancer.java | 66 +++++++++++-
.../apache/giraph/partition/PartitionUtils.java | 86 +++++++++++++-
.../org/apache/giraph/io/TestJsonBase64Format.java | 9 +-
.../org/apache/giraph/TestGraphPartitioner.java | 39 ++++++-
.../org/apache/giraph/TestPartitionContext.java | 10 +-
.../org/apache/giraph/examples/TestPageRank.java | 4 +-
16 files changed, 292 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3323438..d1a3d1e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,8 +1,12 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-535: Range-partitioning and edge locality benchmark (apresta)
+
GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)
+ GIRAPH-530: GiraphInputFormat#getSplits() should be aware of multithreaded input (apresta)
+
GIRAPH-532: Give an explanation when trying to use unregistered aggregators (majakabiljo)
GIRAPH-453: Pure Hive I/O (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index a82a9f8..4e47042 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.vertex.EdgeListVertex;
@@ -257,10 +258,10 @@ public class AggregatorsBenchmark implements Tool {
AggregatorsBenchmarkWorkerContext.class);
job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
Long.parseLong(cmd.getOptionValue('V')));
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
1);
job.getConfiguration().setInt(AGGREGATORS_NUM,
Integer.parseInt(cmd.getOptionValue('a')));
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 8341dce..06ee80c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -25,10 +25,12 @@ import org.apache.commons.cli.PosixParser;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
+import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -90,6 +92,15 @@ public class PageRankBenchmark implements Tool {
"EdgeInputFormat), " +
"7 for MultiGraphByteArrayVertex with unsafe (using " +
"EdgeInputFormat))");
+ options.addOption("l",
+ "localEdgesMinRatio",
+ true,
+ "Minimum ratio of partition-local edges (default is 0)");
+ options.addOption("p",
+ "partitioner",
+ true,
+ "Partitioning algorithm (0 for hash partitioning (default), " +
+ "1 for range partitioning)");
options.addOption("N",
"name",
true,
@@ -140,7 +151,7 @@ public class PageRankBenchmark implements Tool {
GiraphJob job = new GiraphJob(getConf(), name);
GiraphConfiguration configuration = job.getConfiguration();
- setVertexAndInputFormatClasses(cmd, configuration);
+ setClassesAndParameters(cmd, configuration);
configuration.setWorkerConfiguration(workers, workers, 100.0f);
configuration.setInt(
@@ -159,12 +170,13 @@ public class PageRankBenchmark implements Tool {
}
/**
- * Set vertex class and input format class based on command-line arguments.
+ * Set vertex, input format, partitioner classes and related parameters
+ * based on command-line arguments.
*
* @param cmd Command line arguments
* @param configuration Giraph job configuration
*/
- protected void setVertexAndInputFormatClasses(
+ protected void setClassesAndParameters(
CommandLine cmd, GiraphConfiguration configuration) {
int vertexClassOption = cmd.hasOption('c') ? Integer.parseInt(
cmd.getOptionValue('c')) : 1;
@@ -201,24 +213,24 @@ public class PageRankBenchmark implements Tool {
configuration.setVertexCombinerClass(
DoubleSumCombiner.class);
}
+
if (vertexClassOption <= 3) {
configuration.setVertexInputFormatClass(
PseudoRandomVertexInputFormat.class);
- configuration.setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
- Long.parseLong(cmd.getOptionValue('V')));
- configuration.setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
- Long.parseLong(cmd.getOptionValue('e')));
} else {
- configuration.setEdgeInputFormatClass(
- PseudoRandomEdgeInputFormat.class);
- configuration.setLong(
- PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES,
- Long.parseLong(cmd.getOptionValue('V')));
- configuration.setLong(
- PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX,
- Long.parseLong(cmd.getOptionValue('e')));
+ configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class);
+ }
+ configuration.setLong(
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
+ Long.parseLong(cmd.getOptionValue('V')));
+ configuration.setLong(
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
+ Long.parseLong(cmd.getOptionValue('e')));
+ if (cmd.hasOption('l')) {
+ float localEdgesMinRatio = Float.parseFloat(cmd.getOptionValue('l'));
+ configuration.setFloat(
+ PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+ localEdgesMinRatio);
}
int vertexOutputClassOption =
@@ -229,6 +241,12 @@ public class PageRankBenchmark implements Tool {
configuration.setVertexOutputFormatClass(
JsonBase64VertexOutputFormat.class);
}
+
+ if (cmd.hasOption('p') &&
+ Integer.parseInt(cmd.getOptionValue('p')) == 1) {
+ configuration.setGraphPartitionerFactoryClass(
+ SimpleLongRangePartitionerFactory.class);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
index d0d80af..c8e33dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.vertex.EdgeListVertex;
import org.apache.giraph.job.GiraphJob;
@@ -362,10 +363,10 @@ public class RandomMessageBenchmark implements Tool {
RandomMessageBenchmarkMasterCompute.class);
job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
Long.parseLong(cmd.getOptionValue('V')));
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
Long.parseLong(cmd.getOptionValue('e')));
job.getConfiguration().setInt(
SUPERSTEP_COUNT,
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 52bbac4..1843da9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.combiner.MinimumDoubleCombiner;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.vertex.EdgeListVertex;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
@@ -139,10 +140,10 @@ public class ShortestPathsBenchmark implements Tool {
}
job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
Long.parseLong(cmd.getOptionValue('V')));
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
Long.parseLong(cmd.getOptionValue('e')));
boolean isVerbose = false;
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e0aeba3..fcdd57b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -497,6 +497,23 @@ public interface GiraphConstants {
*/
boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
+ /** Multiplier for the current workers squared */
+ String PARTITION_COUNT_MULTIPLIER =
+ "partition.masterPartitionCountMultipler";
+ /** Default mulitplier for current workers squared */
+ float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+
+ /** Overrides default partition count calculation if not -1 */
+ String USER_PARTITION_COUNT =
+ "partition.userPartitionCount";
+ /** Default user partition count */
+ int DEFAULT_USER_PARTITION_COUNT = -1;
+
+ /** Vertex key space size for
+ * {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
+ */
+ String PARTITION_VERTEX_KEY_SPACE_SIZE = "partition.vertexKeySpaceSize";
+
/** Java opts passed to ZooKeeper startup */
String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
/** Default java opts passed to ZooKeeper startup */
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index d197925..2024863 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -46,13 +46,6 @@ import java.util.Set;
*/
public class PseudoRandomEdgeInputFormat
extends EdgeInputFormat<LongWritable, DoubleWritable> {
- /** Set the number of aggregate vertices. */
- public static final String AGGREGATE_VERTICES =
- "pseudoRandomEdgeInputFormat.aggregateVertices";
- /** Set the number of edges per vertex (pseudo-random destination). */
- public static final String EDGES_PER_VERTEX =
- "pseudoRandomEdgeInputFormat.edgesPerVertex";
-
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint)
@@ -103,6 +96,8 @@ public class PseudoRandomEdgeInputFormat
private BspInputSplit bspInputSplit;
/** Saved configuration */
private ImmutableClassesGiraphConfiguration configuration;
+ /** Helper for generating pseudo-random local edges. */
+ private PseudoRandomLocalEdgesHelper localEdgesHelper;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
@@ -111,10 +106,10 @@ public class PseudoRandomEdgeInputFormat
context.getConfiguration());
aggregateVertices =
configuration.getLong(
- PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, 0);
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
if (aggregateVertices <= 0) {
throw new IllegalArgumentException(
- PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES + " <= 0");
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
}
if (inputSplit instanceof BspInputSplit) {
bspInputSplit = (BspInputSplit) inputSplit;
@@ -135,11 +130,16 @@ public class PseudoRandomEdgeInputFormat
" instead of " + BspInputSplit.class);
}
edgesPerVertex = configuration.getLong(
- PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, 0);
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
if (edgesPerVertex <= 0) {
throw new IllegalArgumentException(
- PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX + " <= 0");
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
}
+ float minLocalEdgesRatio = configuration.getFloat(
+ PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+ PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
+ localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
+ minLocalEdgesRatio, configuration);
}
@Override
@@ -172,11 +172,10 @@ public class PseudoRandomEdgeInputFormat
@Override
public Edge<LongWritable, DoubleWritable> getCurrentEdge()
throws IOException, InterruptedException {
- LongWritable destVertexId;
+ LongWritable destVertexId = new LongWritable();
do {
- destVertexId =
- new LongWritable(Math.abs(random.nextLong()) %
- aggregateVertices);
+ destVertexId.set(localEdgesHelper.generateDestVertex(
+ currentVertexId.get(), random));
} while (currentVertexDestVertices.contains(destVertexId));
++currentVertexEdgesRead;
currentVertexDestVertices.add(destVertexId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 19bc3b8..4da8f9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -18,6 +18,8 @@
package org.apache.giraph.io.formats;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Edge;
@@ -33,9 +35,6 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -52,13 +51,6 @@ import java.util.Set;
*/
public class PseudoRandomVertexInputFormat<M extends Writable> extends
VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
- /** Set the number of aggregate vertices. */
- public static final String AGGREGATE_VERTICES =
- "pseudoRandomVertexInputFormat.aggregateVertices";
- /** Set the number of edges per vertex (pseudo-random destination). */
- public static final String EDGES_PER_VERTEX =
- "pseudoRandomVertexInputFormat.edgesPerVertex";
-
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint) throws IOException, InterruptedException {
@@ -101,6 +93,8 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
private BspInputSplit bspInputSplit;
/** Saved configuration */
private ImmutableClassesGiraphConfiguration configuration;
+ /** Helper for generating pseudo-random local edges. */
+ private PseudoRandomLocalEdgesHelper localEdgesHelper;
/**
* Default constructor for reflection.
@@ -115,10 +109,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
context.getConfiguration());
aggregateVertices =
configuration.getLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
if (aggregateVertices <= 0) {
throw new IllegalArgumentException(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0");
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
}
if (inputSplit instanceof BspInputSplit) {
bspInputSplit = (BspInputSplit) inputSplit;
@@ -139,11 +133,16 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
" instead of " + BspInputSplit.class);
}
edgesPerVertex = configuration.getLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0);
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
if (edgesPerVertex <= 0) {
throw new IllegalArgumentException(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0");
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
}
+ float minLocalEdgesRatio = configuration.getFloat(
+ PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+ PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
+ localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
+ minLocalEdgesRatio, configuration);
}
@Override
@@ -166,11 +165,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
Lists.newArrayListWithCapacity((int) edgesPerVertex);
Set<LongWritable> destVertices = Sets.newHashSet();
for (long i = 0; i < edgesPerVertex; ++i) {
- LongWritable destVertexId = null;
+ LongWritable destVertexId = new LongWritable();
do {
- destVertexId =
- new LongWritable(Math.abs(rand.nextLong()) %
- aggregateVertices);
+ destVertexId.set(
+ localEdgesHelper.generateDestVertex(vertexId, rand));
} while (destVertices.contains(destVertexId));
edges.add(EdgeFactory.create(destVertexId,
new DoubleWritable(rand.nextDouble())));
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index a9611d9..5faf367 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -18,17 +18,17 @@
package org.apache.giraph.partition;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
/**
* Master will execute a hash based partitioning.
*
@@ -41,24 +41,10 @@ import org.apache.log4j.Logger;
public class HashMasterPartitioner<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> implements
MasterGraphPartitioner<I, V, E, M> {
- /** Multiplier for the current workers squared */
- public static final String PARTITION_COUNT_MULTIPLIER =
- "hash.masterPartitionCountMultipler";
- /** Default mulitplier for current workers squared */
- public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
- /** Overrides default partition count calculation if not -1 */
- public static final String USER_PARTITION_COUNT =
- "hash.userPartitionCount";
- /** Default user partition count */
- public static final int DEFAULT_USER_PARTITION_COUNT = -1;
/** Class logger */
private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
/** Provided configuration */
private ImmutableClassesGiraphConfiguration conf;
- /** Specified partition count (overrides calculation) */
- private final int userPartitionCount;
- /** Partition count (calculated in createInitialPartitionOwners) */
- private int partitionCount = -1;
/** Save the last generated partition owner list */
private List<PartitionOwner> partitionOwnerList;
@@ -69,44 +55,15 @@ public class HashMasterPartitioner<I extends WritableComparable,
*/
public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
- userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
- DEFAULT_USER_PARTITION_COUNT);
}
@Override
public Collection<PartitionOwner> createInitialPartitionOwners(
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
- if (availableWorkerInfos.isEmpty()) {
- throw new IllegalArgumentException(
- "createInitialPartitionOwners: No available workers");
- }
+ int partitionCount = PartitionUtils.computePartitionCount(
+ availableWorkerInfos, maxWorkers, conf);
List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
- if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
- float multiplier = conf.getFloat(
- PARTITION_COUNT_MULTIPLIER,
- DEFAULT_PARTITION_COUNT_MULTIPLIER);
- partitionCount =
- Math.max((int) (multiplier * availableWorkerInfos.size() *
- availableWorkerInfos.size()),
- 1);
- } else {
- partitionCount = userPartitionCount;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("createInitialPartitionOwners: Creating " +
- partitionCount + ", default would have been " +
- (availableWorkerInfos.size() *
- availableWorkerInfos.size()) + " partitions.");
- }
- int maxPartitions = getMaxPartitions();
- if (partitionCount > maxPartitions) {
- LOG.warn("createInitialPartitionOwners: " +
- "Reducing the partitionCount to " + maxPartitions +
- " from " + partitionCount);
- partitionCount = maxPartitions;
- }
-
for (int i = 0; i < partitionCount; ++i) {
PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
if (!workerIt.hasNext()) {
@@ -151,26 +108,5 @@ public class HashMasterPartitioner<I extends WritableComparable,
return new PartitionStats();
}
- /**
- * Get the maximum number of partitions supported by Giraph.
- *
- * ZooKeeper has a limit of the data in a single znode of 1 MB,
- * and we write all partition descriptions to the same znode.
- *
- * If we are not using checkpointing, each partition owner is serialized
- * as 4 ints (16B), and we need some space to write the list of workers
- * there. 50k partitions is conservative enough.
- *
- * When checkpointing is used, we need enough space to write all the
- * checkpoint file paths.
- *
- * @return Maximum number of partitions allowed
- */
- private int getMaxPartitions() {
- if (conf.useCheckpointing()) {
- return 5000;
- } else {
- return 50000;
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
index bb6e38d..599ea0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
@@ -23,13 +23,8 @@ import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
/**
* Implements hash-based partitioning from the id hash code.
@@ -73,43 +68,8 @@ public class HashWorkerPartitioner<I extends WritableComparable,
WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners,
PartitionStore<I, V, E, M> partitionStore) {
- partitionOwnerList.clear();
- partitionOwnerList.addAll(masterSetPartitionOwners);
-
- Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
- Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
- new HashMap<WorkerInfo, List<Integer>>();
- for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
- if (partitionOwner.getPreviousWorkerInfo() == null) {
- continue;
- } else if (partitionOwner.getWorkerInfo().equals(
- myWorkerInfo) &&
- partitionOwner.getPreviousWorkerInfo().equals(
- myWorkerInfo)) {
- throw new IllegalStateException(
- "updatePartitionOwners: Impossible to have the same " +
- "previous and current worker info " + partitionOwner +
- " as me " + myWorkerInfo);
- } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
- dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
- } else if (partitionOwner.getPreviousWorkerInfo().equals(
- myWorkerInfo)) {
- if (workerPartitionOwnerMap.containsKey(
- partitionOwner.getWorkerInfo())) {
- workerPartitionOwnerMap.get(
- partitionOwner.getWorkerInfo()).add(
- partitionOwner.getPartitionId());
- } else {
- List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
- tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
- workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
- tmpPartitionOwnerList);
- }
- }
- }
-
- return new PartitionExchange(dependentWorkerSet,
- workerPartitionOwnerMap);
+ return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
+ myWorkerInfo, masterSetPartitionOwners, partitionStore);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
index bdbd467..2befa9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
@@ -18,18 +18,20 @@
package org.apache.giraph.partition;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
+import java.util.Set;
/**
* Helper class for balancing partitions across a set of workers.
@@ -284,5 +286,61 @@ public class PartitionBalancer {
return partitionOwnerList;
}
+
+ /**
+ * Helper function to update partition owners and determine which
+ * partitions need to be sent from a specific worker.
+ *
+ * @param partitionOwnerList Local {@link PartitionOwner} list for the
+ * given worker
+ * @param myWorkerInfo Worker info
+ * @param masterSetPartitionOwners Master set partition owners, received
+ * prior to beginning the superstep
+ * @param partitionStore Partition store for the given worker
+ * @return Information for the partition exchange.
+ */
+ public static PartitionExchange updatePartitionOwners(
+ List<PartitionOwner> partitionOwnerList,
+ WorkerInfo myWorkerInfo,
+ Collection<? extends PartitionOwner> masterSetPartitionOwners,
+ PartitionStore partitionStore) {
+ partitionOwnerList.clear();
+ partitionOwnerList.addAll(masterSetPartitionOwners);
+
+ Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+ Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+ new HashMap<WorkerInfo, List<Integer>>();
+ for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+ if (partitionOwner.getPreviousWorkerInfo() == null) {
+ continue;
+ } else if (partitionOwner.getWorkerInfo().equals(
+ myWorkerInfo) &&
+ partitionOwner.getPreviousWorkerInfo().equals(
+ myWorkerInfo)) {
+ throw new IllegalStateException(
+ "updatePartitionOwners: Impossible to have the same " +
+ "previous and current worker info " + partitionOwner +
+ " as me " + myWorkerInfo);
+ } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+ dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+ } else if (partitionOwner.getPreviousWorkerInfo().equals(
+ myWorkerInfo)) {
+ if (workerPartitionOwnerMap.containsKey(
+ partitionOwner.getWorkerInfo())) {
+ workerPartitionOwnerMap.get(
+ partitionOwner.getWorkerInfo()).add(
+ partitionOwner.getPartitionId());
+ } else {
+ List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
+ tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
+ workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+ tmpPartitionOwnerList);
+ }
+ }
+ }
+
+ return new PartitionExchange(dependentWorkerSet,
+ workerPartitionOwnerMap);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index e472ac6..c83ca45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -18,6 +18,14 @@
package org.apache.giraph.partition;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.log4j.Logger;
+
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -26,13 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.giraph.graph.VertexEdgeCount;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
* Helper class for {@link Partition} related operations.
*/
@@ -148,4 +149,75 @@ public class PartitionUtils {
getValue().getEdgeCount());
}
}
+
+ /**
+ * Compute the number of partitions, based on the configuration.
+ *
+ * @param availableWorkerInfos Available workers.
+ * @param maxWorkers Maximum number of workers.
+ * @param conf Configuration.
+ * @return Number of partitions for the job.
+ */
+ public static int computePartitionCount(
+ Collection<WorkerInfo> availableWorkerInfos, int maxWorkers,
+ ImmutableClassesGiraphConfiguration conf) {
+ if (availableWorkerInfos.isEmpty()) {
+ throw new IllegalArgumentException(
+ "computePartitionCount: No available workers");
+ }
+
+ int userPartitionCount = conf.getInt(GiraphConstants.USER_PARTITION_COUNT,
+ GiraphConstants.DEFAULT_USER_PARTITION_COUNT);
+ int partitionCount;
+ if (userPartitionCount == GiraphConstants.DEFAULT_USER_PARTITION_COUNT) {
+ float multiplier = conf.getFloat(
+ GiraphConstants.PARTITION_COUNT_MULTIPLIER,
+ GiraphConstants.DEFAULT_PARTITION_COUNT_MULTIPLIER);
+ partitionCount =
+ Math.max((int) (multiplier * availableWorkerInfos.size() *
+ availableWorkerInfos.size()),
+ 1);
+ } else {
+ partitionCount = userPartitionCount;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("computePartitionCount: Creating " +
+ partitionCount + ", default would have been " +
+ (availableWorkerInfos.size() *
+ availableWorkerInfos.size()) + " partitions.");
+ }
+ int maxPartitions = getMaxPartitions(conf);
+ if (partitionCount > maxPartitions) {
+ LOG.warn("computePartitionCount: " +
+ "Reducing the partitionCount to " + maxPartitions +
+ " from " + partitionCount);
+ partitionCount = maxPartitions;
+ }
+ return partitionCount;
+ }
+
+ /**
+ * Get the maximum number of partitions supported by Giraph.
+ *
+ * ZooKeeper has a limit of the data in a single znode of 1 MB,
+ * and we write all partition descriptions to the same znode.
+ *
+ * If we are not using checkpointing, each partition owner is serialized
+ * as 4 ints (16B), and we need some space to write the list of workers
+ * there. 50k partitions is conservative enough.
+ *
+ * When checkpointing is used, we need enough space to write all the
+ * checkpoint file paths.
+ *
+ * @param conf Configuration.
+ * @return Maximum number of partitions allowed
+ */
+ private static int getMaxPartitions(
+ ImmutableClassesGiraphConfiguration conf) {
+ if (conf.useCheckpointing()) {
+ return 5000;
+ } else {
+ return 50000;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index f4d97a4..b3c63f6 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -21,6 +21,7 @@ import org.apache.giraph.BspCase;
import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
import org.apache.giraph.benchmark.PageRankComputation;
import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
@@ -66,9 +67,9 @@ public class TestJsonBase64Format extends BspCase {
classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2);
assertTrue(job.run(true));
@@ -91,9 +92,9 @@ public class TestJsonBase64Format extends BspCase {
classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
job = prepareJob(getCallingMethodName(), classes, outputPath3);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
+ PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
+ PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5);
assertTrue(job.run(true));
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 2e12bdc..f7fa3f2 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -19,14 +19,16 @@
package org.apache.giraph;
import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.partition.HashRangePartitionerFactory;
import org.apache.giraph.partition.PartitionBalancer;
-import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
+import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,7 +44,7 @@ import static org.junit.Assert.assertTrue;
*/
public class TestGraphPartitioner extends BspCase {
public TestGraphPartitioner() {
- super(TestGraphPartitioner.class.getName());
+ super(TestGraphPartitioner.class.getName());
}
private void verifyOutput(FileSystem fs, Path outputPath)
@@ -50,7 +52,7 @@ public class TestGraphPartitioner extends BspCase {
// TODO: this is fragile (breaks with legit serialization changes)
final int correctLen = 120;
if (runningInDistributedMode()) {
- FileStatus [] fileStatusArr = fs.listStatus(outputPath);
+ FileStatus[] fileStatusArr = fs.listStatus(outputPath);
int totalLen = 0;
for (FileStatus fileStatus : fileStatusArr) {
if (fileStatus.getPath().toString().contains("/part-m-")) {
@@ -131,8 +133,10 @@ public class TestGraphPartitioner extends BspCase {
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
job.getConfiguration().setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- job.getConfiguration().setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- job.getConfiguration().setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.getConfiguration().setVertexInputFormatClass(
+ SimpleSuperstepVertexInputFormat.class);
+ job.getConfiguration().setVertexOutputFormatClass(
+ SimpleSuperstepVertexOutputFormat.class);
job.getConfiguration().setGraphPartitionerFactoryClass(
HashRangePartitionerFactory.class);
outputPath = getTempPath("testHashRangePartitioner");
@@ -158,5 +162,30 @@ public class TestGraphPartitioner extends BspCase {
GeneratedVertexReader.REVERSE_ID_ORDER, true);
assertTrue(job.run(true));
verifyOutput(hdfs, outputPath);
+
+ job = new GiraphJob("testSimpleRangePartitioner");
+ setupConfiguration(job);
+ job.getConfiguration().setVertexClass(
+ SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ job.getConfiguration().setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.getConfiguration().setMasterComputeClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ job.getConfiguration().setVertexInputFormatClass(
+ SimpleSuperstepVertexInputFormat.class);
+ job.getConfiguration().setVertexOutputFormatClass(
+ SimpleSuperstepVertexOutputFormat.class);
+
+ job.getConfiguration().setGraphPartitionerFactoryClass(
+ SimpleLongRangePartitionerFactory.class);
+ long readerVertices = job.getConfiguration().getLong(
+ GeneratedVertexReader.READER_VERTICES, -1);
+ job.getConfiguration().setLong(
+ GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices);
+
+ outputPath = getTempPath("testSimpleRangePartitioner");
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ verifyOutput(hdfs, outputPath);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
index cdf1f65..41f5e3c 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -19,20 +19,20 @@
package org.apache.giraph;
import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.examples.PartitionContextTestVertex;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.PartitionContextTestVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.partition.HashMasterPartitioner;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
+import static org.junit.Assert.assertTrue;
+
public class TestPartitionContext extends BspCase {
public TestPartitionContext() {
super(TestPartitionContext.class.getName());
@@ -65,7 +65,7 @@ public class TestPartitionContext extends BspCase {
PartitionContextTestVertex.NUM_VERTICES);
// Increase the number of partitions
job.getConfiguration().setInt(
- HashMasterPartitioner.USER_PARTITION_COUNT,
+ GiraphConstants.USER_PARTITION_COUNT,
PartitionContextTestVertex.NUM_PARTITIONS);
assertTrue(job.run(true));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
index 5e61596..f56d7e5 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -20,8 +20,8 @@ package org.apache.giraph.examples;
import org.apache.giraph.BspCase;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.partition.HashMasterPartitioner;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -81,7 +81,7 @@ public class TestPageRank extends BspCase {
conf.setNumComputeThreads(numComputeThreads);
// Set enough partitions to generate randomness on the compute side
if (numComputeThreads != 1) {
- conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT,
+ conf.setInt(GiraphConstants.USER_PARTITION_COUNT,
numComputeThreads * 5);
}
assertTrue(job.run(true));