You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/02/25 21:06:47 UTC

git commit: GIRAPH-535: Range-partitioning and edge locality benchmark (apresta)

Updated Branches:
  refs/heads/trunk 71eab655e -> 507959dcb


GIRAPH-535: Range-partitioning and edge locality benchmark (apresta)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/507959dc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/507959dc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/507959dc

Branch: refs/heads/trunk
Commit: 507959dcb4af9b5c5f26a97c237cdb7b6235e7a1
Parents: 71eab65
Author: Alessandro Presta <al...@fb.com>
Authored: Mon Feb 25 11:57:41 2013 -0800
Committer: Alessandro Presta <al...@fb.com>
Committed: Mon Feb 25 12:06:27 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    4 +
 .../giraph/benchmark/AggregatorsBenchmark.java     |    5 +-
 .../apache/giraph/benchmark/PageRankBenchmark.java |   52 ++++++---
 .../giraph/benchmark/RandomMessageBenchmark.java   |    5 +-
 .../giraph/benchmark/ShortestPathsBenchmark.java   |    5 +-
 .../org/apache/giraph/conf/GiraphConstants.java    |   17 +++
 .../io/formats/PseudoRandomEdgeInputFormat.java    |   29 +++---
 .../io/formats/PseudoRandomVertexInputFormat.java  |   34 +++---
 .../giraph/partition/HashMasterPartitioner.java    |   80 ++------------
 .../giraph/partition/HashWorkerPartitioner.java    |   44 +-------
 .../apache/giraph/partition/PartitionBalancer.java |   66 +++++++++++-
 .../apache/giraph/partition/PartitionUtils.java    |   86 +++++++++++++-
 .../org/apache/giraph/io/TestJsonBase64Format.java |    9 +-
 .../org/apache/giraph/TestGraphPartitioner.java    |   39 ++++++-
 .../org/apache/giraph/TestPartitionContext.java    |   10 +-
 .../org/apache/giraph/examples/TestPageRank.java   |    4 +-
 16 files changed, 292 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3323438..d1a3d1e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,8 +1,12 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-535: Range-partitioning and edge locality benchmark (apresta)
+
   GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo)
 
+  GIRAPH-530: GiraphInputFormat#getSplits() should be aware of multithreaded input (apresta)
+
   GIRAPH-532: Give an explanation when trying to use unregistered aggregators (majakabiljo)
 
   GIRAPH-453: Pure Hive I/O (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index a82a9f8..4e47042 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.vertex.EdgeListVertex;
@@ -257,10 +258,10 @@ public class AggregatorsBenchmark implements Tool {
         AggregatorsBenchmarkWorkerContext.class);
     job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
         Long.parseLong(cmd.getOptionValue('V')));
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
         1);
     job.getConfiguration().setInt(AGGREGATORS_NUM,
         Integer.parseInt(cmd.getOptionValue('a')));

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 8341dce..06ee80c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -25,10 +25,12 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
 import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
+import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -90,6 +92,15 @@ public class PageRankBenchmark implements Tool {
             "EdgeInputFormat), " +
             "7 for MultiGraphByteArrayVertex with unsafe (using " +
             "EdgeInputFormat))");
+    options.addOption("l",
+        "localEdgesMinRatio",
+        true,
+        "Minimum ratio of partition-local edges (default is 0)");
+    options.addOption("p",
+        "partitioner",
+        true,
+        "Partitioning algorithm (0 for hash partitioning (default), " +
+            "1 for range partitioning)");
     options.addOption("N",
         "name",
         true,
@@ -140,7 +151,7 @@ public class PageRankBenchmark implements Tool {
 
     GiraphJob job = new GiraphJob(getConf(), name);
     GiraphConfiguration configuration = job.getConfiguration();
-    setVertexAndInputFormatClasses(cmd, configuration);
+    setClassesAndParameters(cmd, configuration);
 
     configuration.setWorkerConfiguration(workers, workers, 100.0f);
     configuration.setInt(
@@ -159,12 +170,13 @@ public class PageRankBenchmark implements Tool {
   }
 
   /**
-   * Set vertex class and input format class based on command-line arguments.
+   * Set vertex, input format, partitioner classes and related parameters
+   * based on command-line arguments.
    *
    * @param cmd Command line arguments
    * @param configuration Giraph job configuration
    */
-  protected void setVertexAndInputFormatClasses(
+  protected void setClassesAndParameters(
       CommandLine cmd, GiraphConfiguration configuration) {
     int vertexClassOption = cmd.hasOption('c') ? Integer.parseInt(
         cmd.getOptionValue('c')) : 1;
@@ -201,24 +213,24 @@ public class PageRankBenchmark implements Tool {
       configuration.setVertexCombinerClass(
           DoubleSumCombiner.class);
     }
+
     if (vertexClassOption <= 3) {
       configuration.setVertexInputFormatClass(
           PseudoRandomVertexInputFormat.class);
-      configuration.setLong(
-          PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
-          Long.parseLong(cmd.getOptionValue('V')));
-      configuration.setLong(
-          PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
-          Long.parseLong(cmd.getOptionValue('e')));
     } else {
-      configuration.setEdgeInputFormatClass(
-          PseudoRandomEdgeInputFormat.class);
-      configuration.setLong(
-          PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES,
-          Long.parseLong(cmd.getOptionValue('V')));
-      configuration.setLong(
-          PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX,
-          Long.parseLong(cmd.getOptionValue('e')));
+      configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class);
+    }
+    configuration.setLong(
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
+        Long.parseLong(cmd.getOptionValue('V')));
+    configuration.setLong(
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
+        Long.parseLong(cmd.getOptionValue('e')));
+    if (cmd.hasOption('l')) {
+      float localEdgesMinRatio = Float.parseFloat(cmd.getOptionValue('l'));
+      configuration.setFloat(
+          PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+          localEdgesMinRatio);
     }
 
     int vertexOutputClassOption =
@@ -229,6 +241,12 @@ public class PageRankBenchmark implements Tool {
       configuration.setVertexOutputFormatClass(
           JsonBase64VertexOutputFormat.class);
     }
+
+    if (cmd.hasOption('p') &&
+        Integer.parseInt(cmd.getOptionValue('p')) == 1) {
+      configuration.setGraphPartitionerFactoryClass(
+          SimpleLongRangePartitionerFactory.class);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
index d0d80af..c8e33dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.aggregators.LongSumAggregator;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.giraph.job.GiraphJob;
@@ -362,10 +363,10 @@ public class RandomMessageBenchmark implements Tool {
         RandomMessageBenchmarkMasterCompute.class);
     job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
         Long.parseLong(cmd.getOptionValue('V')));
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
         Long.parseLong(cmd.getOptionValue('e')));
     job.getConfiguration().setInt(
         SUPERSTEP_COUNT,

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 52bbac4..1843da9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.combiner.MinimumDoubleCombiner;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
@@ -139,10 +140,10 @@ public class ShortestPathsBenchmark implements Tool {
     }
     job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
         Long.parseLong(cmd.getOptionValue('V')));
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
         Long.parseLong(cmd.getOptionValue('e')));
 
     boolean isVerbose = false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e0aeba3..fcdd57b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -497,6 +497,23 @@ public interface GiraphConstants {
    */
   boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true;
 
+  /** Multiplier for the current workers squared */
+  String PARTITION_COUNT_MULTIPLIER =
+      "partition.masterPartitionCountMultipler";
+  /** Default mulitplier for current workers squared */
+  float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+
+  /** Overrides default partition count calculation if not -1 */
+  String USER_PARTITION_COUNT =
+      "partition.userPartitionCount";
+  /** Default user partition count */
+  int DEFAULT_USER_PARTITION_COUNT = -1;
+
+  /** Vertex key space size for
+   * {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
+   */
+  String PARTITION_VERTEX_KEY_SPACE_SIZE = "partition.vertexKeySpaceSize";
+
   /** Java opts passed to ZooKeeper startup */
   String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
   /** Default java opts passed to ZooKeeper startup */

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index d197925..2024863 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -46,13 +46,6 @@ import java.util.Set;
  */
 public class PseudoRandomEdgeInputFormat
     extends EdgeInputFormat<LongWritable, DoubleWritable> {
-  /** Set the number of aggregate vertices. */
-  public static final String AGGREGATE_VERTICES =
-      "pseudoRandomEdgeInputFormat.aggregateVertices";
-  /** Set the number of edges per vertex (pseudo-random destination). */
-  public static final String EDGES_PER_VERTEX =
-      "pseudoRandomEdgeInputFormat.edgesPerVertex";
-
   @Override
   public final List<InputSplit> getSplits(final JobContext context,
                                           final int minSplitCountHint)
@@ -103,6 +96,8 @@ public class PseudoRandomEdgeInputFormat
     private BspInputSplit bspInputSplit;
     /** Saved configuration */
     private ImmutableClassesGiraphConfiguration configuration;
+    /** Helper for generating pseudo-random local edges. */
+    private PseudoRandomLocalEdgesHelper localEdgesHelper;
 
     @Override
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
@@ -111,10 +106,10 @@ public class PseudoRandomEdgeInputFormat
           context.getConfiguration());
       aggregateVertices =
           configuration.getLong(
-              PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, 0);
+              PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
       if (aggregateVertices <= 0) {
         throw new IllegalArgumentException(
-            PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES + " <= 0");
+            PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
       }
       if (inputSplit instanceof BspInputSplit) {
         bspInputSplit = (BspInputSplit) inputSplit;
@@ -135,11 +130,16 @@ public class PseudoRandomEdgeInputFormat
                 " instead of " + BspInputSplit.class);
       }
       edgesPerVertex = configuration.getLong(
-          PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, 0);
+          PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
       if (edgesPerVertex <= 0) {
         throw new IllegalArgumentException(
-            PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX + " <= 0");
+            PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
       }
+      float minLocalEdgesRatio = configuration.getFloat(
+          PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+          PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
+      localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
+          minLocalEdgesRatio, configuration);
     }
 
     @Override
@@ -172,11 +172,10 @@ public class PseudoRandomEdgeInputFormat
     @Override
     public Edge<LongWritable, DoubleWritable> getCurrentEdge()
       throws IOException, InterruptedException {
-      LongWritable destVertexId;
+      LongWritable destVertexId = new LongWritable();
       do {
-        destVertexId =
-            new LongWritable(Math.abs(random.nextLong()) %
-                aggregateVertices);
+        destVertexId.set(localEdgesHelper.generateDestVertex(
+            currentVertexId.get(), random));
       } while (currentVertexDestVertices.contains(destVertexId));
       ++currentVertexEdgesRead;
       currentVertexDestVertices.add(destVertexId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 19bc3b8..4da8f9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.io.formats;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Edge;
@@ -33,9 +35,6 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -52,13 +51,6 @@ import java.util.Set;
  */
 public class PseudoRandomVertexInputFormat<M extends Writable> extends
     VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
-  /** Set the number of aggregate vertices. */
-  public static final String AGGREGATE_VERTICES =
-      "pseudoRandomVertexInputFormat.aggregateVertices";
-  /** Set the number of edges per vertex (pseudo-random destination). */
-  public static final String EDGES_PER_VERTEX =
-      "pseudoRandomVertexInputFormat.edgesPerVertex";
-
   @Override
   public final List<InputSplit> getSplits(final JobContext context,
       final int minSplitCountHint) throws IOException, InterruptedException {
@@ -101,6 +93,8 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
     private BspInputSplit bspInputSplit;
     /** Saved configuration */
     private ImmutableClassesGiraphConfiguration configuration;
+    /** Helper for generating pseudo-random local edges. */
+    private PseudoRandomLocalEdgesHelper localEdgesHelper;
 
     /**
      * Default constructor for reflection.
@@ -115,10 +109,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
           context.getConfiguration());
       aggregateVertices =
         configuration.getLong(
-          PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
+          PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
       if (aggregateVertices <= 0) {
         throw new IllegalArgumentException(
-            PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0");
+            PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
       }
       if (inputSplit instanceof BspInputSplit) {
         bspInputSplit = (BspInputSplit) inputSplit;
@@ -139,11 +133,16 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
             " instead of " + BspInputSplit.class);
       }
       edgesPerVertex = configuration.getLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0);
+          PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
       if (edgesPerVertex <= 0) {
         throw new IllegalArgumentException(
-          PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0");
+          PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
       }
+      float minLocalEdgesRatio = configuration.getFloat(
+          PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+          PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
+      localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
+          minLocalEdgesRatio, configuration);
     }
 
     @Override
@@ -166,11 +165,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
           Lists.newArrayListWithCapacity((int) edgesPerVertex);
       Set<LongWritable> destVertices = Sets.newHashSet();
       for (long i = 0; i < edgesPerVertex; ++i) {
-        LongWritable destVertexId = null;
+        LongWritable destVertexId = new LongWritable();
         do {
-          destVertexId =
-            new LongWritable(Math.abs(rand.nextLong()) %
-              aggregateVertices);
+          destVertexId.set(
+              localEdgesHelper.generateDestVertex(vertexId, rand));
         } while (destVertices.contains(destVertexId));
         edges.add(EdgeFactory.create(destVertexId,
             new DoubleWritable(rand.nextDouble())));

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index a9611d9..5faf367 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -18,17 +18,17 @@
 
 package org.apache.giraph.partition;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * Master will execute a hash based partitioning.
  *
@@ -41,24 +41,10 @@ import org.apache.log4j.Logger;
 public class HashMasterPartitioner<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     MasterGraphPartitioner<I, V, E, M> {
-  /** Multiplier for the current workers squared */
-  public static final String PARTITION_COUNT_MULTIPLIER =
-    "hash.masterPartitionCountMultipler";
-  /** Default mulitplier for current workers squared */
-  public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
-  /** Overrides default partition count calculation if not -1 */
-  public static final String USER_PARTITION_COUNT =
-    "hash.userPartitionCount";
-  /** Default user partition count */
-  public static final int DEFAULT_USER_PARTITION_COUNT = -1;
   /** Class logger */
   private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
   /** Provided configuration */
   private ImmutableClassesGiraphConfiguration conf;
-  /** Specified partition count (overrides calculation) */
-  private final int userPartitionCount;
-  /** Partition count (calculated in createInitialPartitionOwners) */
-  private int partitionCount = -1;
   /** Save the last generated partition owner list */
   private List<PartitionOwner> partitionOwnerList;
 
@@ -69,44 +55,15 @@ public class HashMasterPartitioner<I extends WritableComparable,
    */
   public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
-    userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
-        DEFAULT_USER_PARTITION_COUNT);
   }
 
   @Override
   public Collection<PartitionOwner> createInitialPartitionOwners(
       Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
-    if (availableWorkerInfos.isEmpty()) {
-      throw new IllegalArgumentException(
-          "createInitialPartitionOwners: No available workers");
-    }
+    int partitionCount = PartitionUtils.computePartitionCount(
+        availableWorkerInfos, maxWorkers, conf);
     List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
     Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
-    if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
-      float multiplier = conf.getFloat(
-          PARTITION_COUNT_MULTIPLIER,
-          DEFAULT_PARTITION_COUNT_MULTIPLIER);
-      partitionCount =
-          Math.max((int) (multiplier * availableWorkerInfos.size() *
-              availableWorkerInfos.size()),
-              1);
-    } else {
-      partitionCount = userPartitionCount;
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("createInitialPartitionOwners: Creating " +
-        partitionCount + ", default would have been " +
-        (availableWorkerInfos.size() *
-         availableWorkerInfos.size()) + " partitions.");
-    }
-    int maxPartitions = getMaxPartitions();
-    if (partitionCount > maxPartitions) {
-      LOG.warn("createInitialPartitionOwners: " +
-          "Reducing the partitionCount to " + maxPartitions +
-          " from " + partitionCount);
-      partitionCount = maxPartitions;
-    }
-
     for (int i = 0; i < partitionCount; ++i) {
       PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
       if (!workerIt.hasNext()) {
@@ -151,26 +108,5 @@ public class HashMasterPartitioner<I extends WritableComparable,
     return new PartitionStats();
   }
 
-  /**
-   * Get the maximum number of partitions supported by Giraph.
-   *
-   * ZooKeeper has a limit of the data in a single znode of 1 MB,
-   * and we write all partition descriptions to the same znode.
-   *
-   * If we are not using checkpointing, each partition owner is serialized
-   * as 4 ints (16B), and we need some space to write the list of workers
-   * there. 50k partitions is conservative enough.
-   *
-   * When checkpointing is used, we need enough space to write all the
-   * checkpoint file paths.
-   *
-   * @return Maximum number of partitions allowed
-   */
-  private int getMaxPartitions() {
-    if (conf.useCheckpointing()) {
-      return 5000;
-    } else {
-      return 50000;
-    }
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
index bb6e38d..599ea0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
@@ -23,13 +23,8 @@ import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * Implements hash-based partitioning from the id hash code.
@@ -73,43 +68,8 @@ public class HashWorkerPartitioner<I extends WritableComparable,
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
       PartitionStore<I, V, E, M> partitionStore) {
-    partitionOwnerList.clear();
-    partitionOwnerList.addAll(masterSetPartitionOwners);
-
-    Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
-    Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
-        new HashMap<WorkerInfo, List<Integer>>();
-    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
-      if (partitionOwner.getPreviousWorkerInfo() == null) {
-        continue;
-      } else if (partitionOwner.getWorkerInfo().equals(
-          myWorkerInfo) &&
-          partitionOwner.getPreviousWorkerInfo().equals(
-              myWorkerInfo)) {
-        throw new IllegalStateException(
-            "updatePartitionOwners: Impossible to have the same " +
-                "previous and current worker info " + partitionOwner +
-                " as me " + myWorkerInfo);
-      } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
-        dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
-      } else if (partitionOwner.getPreviousWorkerInfo().equals(
-          myWorkerInfo)) {
-        if (workerPartitionOwnerMap.containsKey(
-            partitionOwner.getWorkerInfo())) {
-          workerPartitionOwnerMap.get(
-              partitionOwner.getWorkerInfo()).add(
-                  partitionOwner.getPartitionId());
-        } else {
-          List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
-          tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
-          workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
-                                      tmpPartitionOwnerList);
-        }
-      }
-    }
-
-    return new PartitionExchange(dependentWorkerSet,
-        workerPartitionOwnerMap);
+    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
+        myWorkerInfo, masterSetPartitionOwners, partitionStore);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
index bdbd467..2befa9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
@@ -18,18 +18,20 @@
 
 package org.apache.giraph.partition;
 
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
+import java.util.Set;
 
 /**
  * Helper class for balancing partitions across a set of workers.
@@ -284,5 +286,61 @@ public class PartitionBalancer {
 
     return partitionOwnerList;
   }
+
+  /**
+   * Helper function to update partition owners and determine which
+   * partitions need to be sent from a specific worker.
+   *
+   * @param partitionOwnerList Local {@link PartitionOwner} list for the
+   *                           given worker
+   * @param myWorkerInfo Worker info
+   * @param masterSetPartitionOwners Master set partition owners, received
+   *        prior to beginning the superstep
+   * @param partitionStore Partition store for the given worker
+   * @return Information for the partition exchange.
+   */
+  public static PartitionExchange updatePartitionOwners(
+      List<PartitionOwner> partitionOwnerList,
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore partitionStore) {
+    partitionOwnerList.clear();
+    partitionOwnerList.addAll(masterSetPartitionOwners);
+
+    Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+    Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+        new HashMap<WorkerInfo, List<Integer>>();
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getPreviousWorkerInfo() == null) {
+        continue;
+      } else if (partitionOwner.getWorkerInfo().equals(
+          myWorkerInfo) &&
+          partitionOwner.getPreviousWorkerInfo().equals(
+              myWorkerInfo)) {
+        throw new IllegalStateException(
+            "updatePartitionOwners: Impossible to have the same " +
+                "previous and current worker info " + partitionOwner +
+                " as me " + myWorkerInfo);
+      } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+        dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+      } else if (partitionOwner.getPreviousWorkerInfo().equals(
+          myWorkerInfo)) {
+        if (workerPartitionOwnerMap.containsKey(
+            partitionOwner.getWorkerInfo())) {
+          workerPartitionOwnerMap.get(
+              partitionOwner.getWorkerInfo()).add(
+              partitionOwner.getPartitionId());
+        } else {
+          List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
+          tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
+          workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+              tmpPartitionOwnerList);
+        }
+      }
+    }
+
+    return new PartitionExchange(dependentWorkerSet,
+        workerPartitionOwnerMap);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index e472ac6..c83ca45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -18,6 +18,14 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.log4j.Logger;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -26,13 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.giraph.graph.VertexEdgeCount;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 /**
  * Helper class for {@link Partition} related operations.
  */
@@ -148,4 +149,75 @@ public class PartitionUtils {
               getValue().getEdgeCount());
     }
   }
+
+  /**
+   * Compute the number of partitions, based on the configuration.
+   *
+   * @param availableWorkerInfos Available workers.
+   * @param maxWorkers Maximum number of workers.
+   * @param conf Configuration.
+   * @return Number of partitions for the job.
+   */
+  public static int computePartitionCount(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers,
+      ImmutableClassesGiraphConfiguration conf) {
+    if (availableWorkerInfos.isEmpty()) {
+      throw new IllegalArgumentException(
+          "computePartitionCount: No available workers");
+    }
+
+    int userPartitionCount = conf.getInt(GiraphConstants.USER_PARTITION_COUNT,
+        GiraphConstants.DEFAULT_USER_PARTITION_COUNT);
+    int partitionCount;
+    if (userPartitionCount == GiraphConstants.DEFAULT_USER_PARTITION_COUNT) {
+      float multiplier = conf.getFloat(
+          GiraphConstants.PARTITION_COUNT_MULTIPLIER,
+          GiraphConstants.DEFAULT_PARTITION_COUNT_MULTIPLIER);
+      partitionCount =
+          Math.max((int) (multiplier * availableWorkerInfos.size() *
+              availableWorkerInfos.size()),
+              1);
+    } else {
+      partitionCount = userPartitionCount;
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("computePartitionCount: Creating " +
+          partitionCount + ", default would have been " +
+          (availableWorkerInfos.size() *
+              availableWorkerInfos.size()) + " partitions.");
+    }
+    int maxPartitions = getMaxPartitions(conf);
+    if (partitionCount > maxPartitions) {
+      LOG.warn("computePartitionCount: " +
+          "Reducing the partitionCount to " + maxPartitions +
+          " from " + partitionCount);
+      partitionCount = maxPartitions;
+    }
+    return partitionCount;
+  }
+
+  /**
+   * Get the maximum number of partitions supported by Giraph.
+   *
+   * ZooKeeper has a limit of the data in a single znode of 1 MB,
+   * and we write all partition descriptions to the same znode.
+   *
+   * If we are not using checkpointing, each partition owner is serialized
+   * as 4 ints (16B), and we need some space to write the list of workers
+   * there. 50k partitions is conservative enough.
+   *
+   * When checkpointing is used, we need enough space to write all the
+   * checkpoint file paths.
+   *
+   * @param conf Configuration.
+   * @return Maximum number of partitions allowed
+   */
+  private static int getMaxPartitions(
+      ImmutableClassesGiraphConfiguration conf) {
+    if (conf.useCheckpointing()) {
+      return 5000;
+    } else {
+      return 50000;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index f4d97a4..b3c63f6 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -21,6 +21,7 @@ import org.apache.giraph.BspCase;
 import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
 import org.apache.giraph.benchmark.PageRankComputation;
 import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
@@ -66,9 +67,9 @@ public class TestJsonBase64Format extends BspCase {
     classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2);
 
     assertTrue(job.run(true));
@@ -91,9 +92,9 @@ public class TestJsonBase64Format extends BspCase {
     classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     job = prepareJob(getCallingMethodName(), classes, outputPath3);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5);
     assertTrue(job.run(true));
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 2e12bdc..f7fa3f2 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -19,14 +19,16 @@
 package org.apache.giraph;
 
 import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.partition.HashRangePartitionerFactory;
 import org.apache.giraph.partition.PartitionBalancer;
-import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
+import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,7 +44,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestGraphPartitioner extends BspCase {
   public TestGraphPartitioner() {
-      super(TestGraphPartitioner.class.getName());
+    super(TestGraphPartitioner.class.getName());
   }
 
   private void verifyOutput(FileSystem fs, Path outputPath)
@@ -50,7 +52,7 @@ public class TestGraphPartitioner extends BspCase {
     // TODO: this is fragile (breaks with legit serialization changes)
     final int correctLen = 120;
     if (runningInDistributedMode()) {
-      FileStatus [] fileStatusArr = fs.listStatus(outputPath);
+      FileStatus[] fileStatusArr = fs.listStatus(outputPath);
       int totalLen = 0;
       for (FileStatus fileStatus : fileStatusArr) {
         if (fileStatus.getPath().toString().contains("/part-m-")) {
@@ -131,8 +133,10 @@ public class TestGraphPartitioner extends BspCase {
         SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
     job.getConfiguration().setMasterComputeClass(
         SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
-    job.getConfiguration().setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    job.getConfiguration().setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+    job.getConfiguration().setVertexInputFormatClass(
+        SimpleSuperstepVertexInputFormat.class);
+    job.getConfiguration().setVertexOutputFormatClass(
+        SimpleSuperstepVertexOutputFormat.class);
     job.getConfiguration().setGraphPartitionerFactoryClass(
         HashRangePartitionerFactory.class);
     outputPath = getTempPath("testHashRangePartitioner");
@@ -158,5 +162,30 @@ public class TestGraphPartitioner extends BspCase {
         GeneratedVertexReader.REVERSE_ID_ORDER, true);
     assertTrue(job.run(true));
     verifyOutput(hdfs, outputPath);
+
+    job = new GiraphJob("testSimpleRangePartitioner");
+    setupConfiguration(job);
+    job.getConfiguration().setVertexClass(
+        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    job.getConfiguration().setWorkerContextClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+    job.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+    job.getConfiguration().setVertexInputFormatClass(
+        SimpleSuperstepVertexInputFormat.class);
+    job.getConfiguration().setVertexOutputFormatClass(
+        SimpleSuperstepVertexOutputFormat.class);
+
+    job.getConfiguration().setGraphPartitionerFactoryClass(
+        SimpleLongRangePartitionerFactory.class);
+    long readerVertices = job.getConfiguration().getLong(
+        GeneratedVertexReader.READER_VERTICES, -1);
+    job.getConfiguration().setLong(
+        GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices);
+
+    outputPath = getTempPath("testSimpleRangePartitioner");
+    removeAndSetOutput(job, outputPath);
+    assertTrue(job.run(true));
+    verifyOutput(hdfs, outputPath);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
index cdf1f65..41f5e3c 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -19,20 +19,20 @@
 package org.apache.giraph;
 
 import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.examples.PartitionContextTestVertex;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.PartitionContextTestVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
 import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.partition.HashMasterPartitioner;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestPartitionContext extends BspCase {
   public TestPartitionContext() {
     super(TestPartitionContext.class.getName());
@@ -65,7 +65,7 @@ public class TestPartitionContext extends BspCase {
         PartitionContextTestVertex.NUM_VERTICES);
     // Increase the number of partitions
     job.getConfiguration().setInt(
-        HashMasterPartitioner.USER_PARTITION_COUNT,
+        GiraphConstants.USER_PARTITION_COUNT,
         PartitionContextTestVertex.NUM_PARTITIONS);
     assertTrue(job.run(true));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
index 5e61596..f56d7e5 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -20,8 +20,8 @@ package org.apache.giraph.examples;
 import org.apache.giraph.BspCase;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.partition.HashMasterPartitioner;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -81,7 +81,7 @@ public class TestPageRank extends BspCase {
     conf.setNumComputeThreads(numComputeThreads);
     // Set enough partitions to generate randomness on the compute side
     if (numComputeThreads != 1) {
-      conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT,
+      conf.setInt(GiraphConstants.USER_PARTITION_COUNT,
           numComputeThreads * 5);
     }
     assertTrue(job.run(true));